## DS 2002 Midterm Project 

#### Declare & Assign Connection Variables for the MySQL Server & Databases with which You'll be Working 

In [1]:
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine

In [2]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila_dw2"

#### Define Functions for Getting Data From and Setting Data Into Databases

In [3]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Create the New Data Warehouse database, and to Use it, Switch the Connection Context.
Clearly, you won't get very far without having a database to work with. Here we demonstrate how we can *drop* a database if it already exists, and then *create* the new **sakila_dw2** database and *use* it as the target of all subsequent operations.

In [4]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;") #overwrites database to start over
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x18c8e012350>

### Create & Populate the Dimension Tables
At this point, we have to execute the script for **Lab 2c** which creates and populates a **Date Dimension** table.  Be certain to target this script to the new data warehouse database we just created in the preceding cell.  Later in this notebook we will integrate the **dim_date** table with the fact table by performing **lookup operations** to retreive the surrogate primary keys from the date dimension table that correspond with each **date** typed column in the fact table (e.g., rental_date, payment_date, create_date).

#### Extract Data from the Source Database Tables

In [5]:
sql_customers = "SELECT * FROM sakila.customer;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [6]:
sql_inventory = "SELECT * FROM sakila.inventory;"
df_inventory = get_dataframe(user_id, pwd, host_name, src_dbname, sql_inventory)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


In [7]:
sql_staff = "SELECT * FROM sakila.staff;"
df_staff = get_dataframe(user_id, pwd, host_name, src_dbname, sql_staff)
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


#### Perform Any Necessary Transformations
Pandas DataFrames enable extensive data modification capabilities. Here we will start by simply dropping features (columns) that we don't believe provide any real value to our analytics solution. Examples include columns having a high percentage of NULL values, columns having large amounts of free-text, and columns having binary large object (BLOB) data such as images or other documents. Then, we will rename the primary key column (id) to conform with data warehouse design standards.

In [8]:
drop_cols = ['email'] #get rid of columns with no data or just text data (use list)
df_customers.drop(drop_cols, axis=1, inplace=True) #columns must be in format of list, axis 0=rows, axis 1=columns
df_customers.rename(columns={"customer_id":"customer_key"}, inplace=True) #dictionary filled with key : value pairs

df_customers.head(2)

#modifications are directly to dataframe instead of making a copy "inplace=True"

Unnamed: 0,customer_key,store_id,first_name,last_name,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [9]:

df_inventory.rename(columns={"inventory_id":"inventory_key"}, inplace=True) #dictionary filled with key : value pairs

df_inventory.head(2)

#modifications are directly to dataframe instead of making a copy "inplace=True"

Unnamed: 0,inventory_key,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


In [10]:
drop_cols = ['picture','email','password'] #get rid of columns with no data or just text data (use list)
df_staff.drop(drop_cols, axis=1, inplace=True) #columns must be in format of list, axis 0=rows, axis 1=columns
df_staff.rename(columns={"staff_id":"staff_key"}, inplace=True) #dictionary filled with key : value pairs

df_staff.head(2)

#modifications are directly to dataframe instead of making a copy "inplace=True"

Unnamed: 0,staff_key,first_name,last_name,address_id,store_id,active,username,last_update
0,1,Mike,Hillyer,3,1,1,Mike,2006-02-15 03:57:16
1,2,Jon,Stephens,4,2,1,Jon,2006-02-15 03:57:16


#### Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables
Here I demonstrate how we can create an iterable data structure containing the values needed to correctly create and populate the new dimension tables. If you inspect this code listing carefully, you'll notice that it's a **list** containing a **set** *(or vector)* for each dimension table. Each **set** then contains the *table_name* we need to assign to the table, the *pandas DataFrame* we crafted to define & populate the table, and the name we need to assign to the *primary_key* column.  With this *list of sets* defined, we can then call our **set_dataframe( )** function from within a **for *loop*** to create each *dimension* table.

In [11]:
db_operation = "insert"

#list of vectors (each vector is a row)
#1. name of new dimension table
#2. data
#3. unique identifier = primary key

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_inventory', df_inventory, 'inventory_key'),
          #('dim_rental', df_rental, 'rental_key'),
          ('dim_staff', df_staff, 'staff_key')]

In [12]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

### Create & Populate the Fact Table
Here we will learn two approaches to creating the *fact_orders* fact table. The first approach demonstrates that a carefully crafted SQL SELECT statement can be used to perform this task... *but what fun would that be.* Seriously though, this approach is quick and effect if you already have the query, but what if you didn't have the opportunity to view and work with the data beforehand?  What's more, you may be required to combine data from multiple sources, some of which may not be relational database management systems. Then, a simple SQL query won't do!  You would need to load the data from the various sources (e.g., database tables, CSV or JSON files, NoSQL document collections, API stream data) and then combine them into a single dataframe that you could then use to create a new database table. For this reason we'll see how we can retrieve the data, but we won't bother to use it for creating a new table... we already know how to do that using the **set_dataframe( )** function anyway.

#### First, you could simply use the SQL SELECT statement you authored in Lab 2 

In [13]:
sql_query = '''

SELECT `film`.`title`,
    `film`.`description`,
    `film`.`release_year`,
    `film`.`language_id`,
    `film`.`original_language_id`,
    `film`.`rental_duration`,
    `film`.`rental_rate`,
    `film`.`length`,
    `film`.`replacement_cost`,
    `film`.`rating`,
    `film`.`special_features`,
    `film`.`last_update`,
    `film_category`.`category_id`,
    `film_actor`.`actor_id`
FROM `sakila`.`film`
INNER JOIN sakila.film_category
ON film.film_id = film_category.film_id
LEFT OUTER JOIN sakila.film_actor
ON film.film_id = film_actor.film_id;

'''

df_fact_film = get_dataframe(user_id, pwd, host_name, src_dbname, sql_query)

df_fact_film.head(2)


Unnamed: 0,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update,category_id,actor_id
0,AMADEUS HOLY,A Emotional Display of a Pioneer And a Technic...,2006,1,,6,0.99,113,20.99,PG,"Commentaries,Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,1,5.0
1,AMADEUS HOLY,A Emotional Display of a Pioneer And a Technic...,2006,1,,6,0.99,113,20.99,PG,"Commentaries,Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,1,27.0


#### Instead, Implement the solution using Pandas DataFrames to Craft the table

##### Get all the data from each of the four tables involved

In [14]:
sql_film = "SELECT * FROM sakila.film;"
df_film = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film)
drop_cols = ['description', 'language_id', 'original_language_id', 'title']
df_film.drop(drop_cols, axis=1, inplace=True)
df_film.head(2)

Unnamed: 0,film_id,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,2006,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,2006,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [15]:
sql_film_actor = "SELECT * FROM sakila.film_actor;"
df_film_actor = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film_actor)
df_film_actor.drop('last_update', axis=1, inplace=True)
df_film_actor.head(2)

Unnamed: 0,actor_id,film_id
0,1,1
1,1,23


In [16]:
sql_film_category = "SELECT * FROM sakila.film_category;"
df_film_category = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film_category)
df_film_category.drop('last_update', axis=1, inplace=True)
df_film_category.head(2)

Unnamed: 0,film_id,category_id
0,1,6
1,2,11


In [17]:
sql_film_text = "SELECT * FROM sakila.film_text;"
df_film_text = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film_text)
df_film_text.head(2)

Unnamed: 0,film_id,title,description
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...


##### Join the Film, FilmActor, FilmCategory, and FilmText DataFrames 
In this step we can now easily join the *Film*, *Film_actor*, *Film_category*, and *Film_text* dataframes. 

In [18]:
df_fact_film = pd.merge(df_film, df_film_actor, on='film_id', how='right')
df_fact_film.head(2)

Unnamed: 0,film_id,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update,actor_id
0,1,2006,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,1
1,23,2006,3,0.99,92,9.99,R,"Trailers,Deleted Scenes",2006-02-15 05:03:42,1


In [19]:
df_fact_film = pd.merge(df_fact_film, df_film_category, on='film_id', how='right')
df_fact_film.head(2)

Unnamed: 0,film_id,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update,actor_id,category_id
0,1,2006.0,6.0,0.99,86.0,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,1.0,6
1,1,2006.0,6.0,0.99,86.0,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,10.0,6


In [20]:
df_fact_film = pd.merge(df_fact_film, df_film_text, on='film_id', how='right')
df_fact_film.head(2)

Unnamed: 0,film_id,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update,actor_id,category_id,title,description
0,1,2006.0,6.0,0.99,86.0,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,1.0,6,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...
1,1,2006.0,6.0,0.99,86.0,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42,10.0,6,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...


##### Get the Data from the Date Dimension Table.
First, fetch the Surrogate Primary Key (date_key) and the Business Key (full_date) from the Date Dimension table using the **get_dataframe()** function. Also, be certain to cast the **full_date** column to the **datetime64** data type using the **.astype()** function that is native to Pandas DataFrame columns.

In [21]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


##### Lookup the DateKeys from the Date Dimension Table.
Next, for each date typed column in the fact table, lookup the corresponding Surrogate Primary Key column.

In [22]:

df_dim_last_update = df_dim_date.rename(columns={"date_key":"last_update_key","full_date":"last_update"})
df_fact_film.last_update = df_fact_film.last_update.astype('datetime64').dt.date
df_fact_film = pd.merge(df_fact_film, df_dim_last_update, on='last_update', how='left')
df_fact_film.drop(['last_update'], axis=1, inplace=True)
df_fact_film.head(2)

  df_fact_film.last_update = df_fact_film.last_update.astype('datetime64').dt.date


Unnamed: 0,film_id,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features,actor_id,category_id,title,description,last_update_key
0,1,2006.0,6.0,0.99,86.0,20.99,PG,"Deleted Scenes,Behind the Scenes",1.0,6,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,20060215.0
1,1,2006.0,6.0,0.99,86.0,20.99,PG,"Deleted Scenes,Behind the Scenes",10.0,6,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,20060215.0


##### Perform any Additional Transformations
In this step we can prepare the DataFrame so that it defines exactly what we want to see created in the database.  Issues may include dropping unwanted columns, reordering the columns, and in our case, creating a new column to serve as the primary key.

In [23]:
df_fact_film.head(5)

Unnamed: 0,film_id,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features,actor_id,category_id,title,description,last_update_key
0,1,2006.0,6.0,0.99,86.0,20.99,PG,"Deleted Scenes,Behind the Scenes",1.0,6,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,20060215.0
1,1,2006.0,6.0,0.99,86.0,20.99,PG,"Deleted Scenes,Behind the Scenes",10.0,6,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,20060215.0
2,1,2006.0,6.0,0.99,86.0,20.99,PG,"Deleted Scenes,Behind the Scenes",20.0,6,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,20060215.0
3,1,2006.0,6.0,0.99,86.0,20.99,PG,"Deleted Scenes,Behind the Scenes",30.0,6,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,20060215.0
4,1,2006.0,6.0,0.99,86.0,20.99,PG,"Deleted Scenes,Behind the Scenes",40.0,6,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,20060215.0


In [24]:
#rename foreign key columns

df_fact_film.rename(columns={"film_id":"film_key", "category_id":"category_key", "actor_id":"actor_key"}, inplace=True)


#insert a new column, with an ever-incrementing numeric value, to serve as the primary key
df_fact_film.insert(0, "fact_film_key", range(1, df_fact_film.shape[0]+1))
df_fact_film.head(2)

Unnamed: 0,fact_film_key,film_key,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features,actor_key,category_key,title,description,last_update_key
0,1,1,2006.0,6.0,0.99,86.0,20.99,PG,"Deleted Scenes,Behind the Scenes",1.0,6,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,20060215.0
1,2,1,2006.0,6.0,0.99,86.0,20.99,PG,"Deleted Scenes,Behind the Scenes",10.0,6,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,20060215.0


##### Write the DataFrame Back to the Database


In [25]:
table_name = "fact_film"
primary_key = "fact_film_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_film, table_name, primary_key, db_operation)

## Using Python to Integrate MongoDB Data into an ETL Process
Modern Data Warehousing and Analytics solutions frequently use languages like Python or Scala to extract data from numerous sources, including relational database management systems, NoSQL database systems, real-time streaming endpoints and Data Lakes. These languages can then be used to perform many types of transformation before then loading the data into a variety of destinations including file systems and data warehouses. This data can then be consumed by data scientists or business analysts.

In this lab you will build upon the **sakila_dw2** dimensional database from Lab 3; however, you will be integrating new data sourced from an instance of MongoDB. The new data will be concerned with new business processes; inventory and purchasing. You will continue to interact with both the source systems (MongoDB and MySQL), and the destination system (the sakila_DW2 data warehouse) from a remote client running Python (Jupyter Notebooks). 

Just as in Lab 3, you will fetch data into Pandas DataFrames, perform all the necessary transformations in-memory on the client, and then push the newly transformed DataFrame to the RDBMS data warehouse using a Pandas function that will create the table and fill it with data with a single operation.

### Prerequisites:
#### Import the Necessary Libraries

In [26]:
import os
import json
import numpy
import datetime
import pandas as pd
import pymysql
import mysql.connector

import pymongo
from sqlalchemy import create_engine

In [27]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"
mysql_hostname = "localhost"

atlas_cluster_name = "midterm.30qzwpg"
atlas_user_name = "qeu5gn"
atlas_password = "twixie123"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "sakila" ### Figure this out
dst_dbname = "sakila_dw2"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://qeu5gn:twixie123@midterm.30qzwpg.mongodb.net


#### Define Functions for Getting Data From and Setting Data Into Databases

In [28]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Populate MongoDB with Source Data
You only need to run this cell once; however, the operation is *idempotent*.  In other words, it can be run multiple times without changing the end result.

In [29]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"rental" : 'sakila_rental.json',
              "payment" : 'sakila_payment.json',
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        print(f"{file} was successfully loaded.")

        
client.close()        

Collection(Database(MongoClient(host=['ac-jsy8sst-shard-00-02.30qzwpg.mongodb.net:27017', 'ac-jsy8sst-shard-00-01.30qzwpg.mongodb.net:27017', 'ac-jsy8sst-shard-00-00.30qzwpg.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, authsource='admin', replicaset='atlas-qztxyb-shard-0', tls=True), 'sakila'), 'rental') was successfully loaded.
Collection(Database(MongoClient(host=['ac-jsy8sst-shard-00-02.30qzwpg.mongodb.net:27017', 'ac-jsy8sst-shard-00-01.30qzwpg.mongodb.net:27017', 'ac-jsy8sst-shard-00-00.30qzwpg.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, authsource='admin', replicaset='atlas-qztxyb-shard-0', tls=True), 'sakila'), 'payment') was successfully loaded.


### Create and Populate the New Dimension Tables
#### Extract Data from the Source MongoDB Collections Into DataFrames

In [142]:
# TODO: Extract data from the "Rental" collection

query = {} # Select all elements (columns), and all documents (rows).
collection = "rental"

df_rental = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)  # Specify 'atlas', or 'local'
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


#### Perform Any Necessary Transformations to the DataFrames

In [143]:
df_rental.rename(columns={"rental_id":"rental_key"}, inplace=True)
df_rental.head(2)

Unnamed: 0,rental_key,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


#### Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

Here we will call our **set_dataframe( )** function to create each dimension table. This function expects a number of parameters including the usual connection information (e.g., user_id, password, MySQL server name and database), the *table_name* we need to assign to the table, the *pandas DataFrame* we crafted to define & populate the table, the *name* we need to assign to the *primary_key* column, and finally, the database operation (insert or update). 

In [144]:
# TODO: Upload the "rental" dataframe to create the new "dim_rental" dimension table

dataframe = df_rental
table_name = 'dim_rental'
primary_key = 'rental_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Validate that the New Dimension Tables were Created.

In [145]:
# TODO: Validate the new "dim_rental" table in the sakila_dw2 data warehouse.

sql_rental = "SELECT * FROM sakila_dw2.dim_rental;"
df_dim_rental = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_rental)
df_dim_rental.head(2)

Unnamed: 0,rental_key,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


### Create and Populate the New Fact Tables
#### Extract Data from the Source MongoDB Collections Into DataFrames

In [146]:
query = {} # Select all elements (columns), and all documents (rows).

collection = "actor"

df_actor = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_actor.head(2)

Unnamed: 0,actor_id,first_name,last_name,last_update
0,14,VIVIEN,BERGEN,2006-02-15 04:34:33
1,1,PENELOPE,GUINESS,2006-02-15 04:34:33


In [147]:
query = {} # Select all elements (columns), and all documents (rows).

collection = "payment"

df_payment = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


#### Get the Data from the Date Dimension Table.
First, fetch the Surrogate Primary Key (date_key) and the Business Key (full_date) from the Date Dimension table using the **get_dataframe()** function. Also, be certain to cast the **full_date** column to the **datetime64** data type using the **.astype()** function that is native to Pandas DataFrame columns.

In [148]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


##### Lookup the DateKeys from the Date Dimension Table.
Next, for each date typed column in the purchase orders fact tables, lookup the corresponding Surrogate Primary Key column.

In [149]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "submitted_date" Column.
df_dim_last_update = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_actor.last_update = df_actor.last_update.astype('datetime64').dt.date
df_actor = pd.merge(df_actor, df_dim_last_update, on='last_update', how='left')
df_actor.drop(['last_update'], axis=1, inplace=True)
df_actor.head(2)

Unnamed: 0,actor_id,first_name,last_name,last_update_key
0,14,VIVIEN,BERGEN,20060215
1,1,PENELOPE,GUINESS,20060215


In [150]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "" Column.
df_dim_last_update = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_payment.last_update = df_payment.last_update.astype('datetime64').dt.date
df_payment = pd.merge(df_payment, df_dim_last_update, on='last_update', how='left')
df_payment.drop(['last_update'], axis=1, inplace=True)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update_key
0,1,1,1,76,2.99,2005-05-25 11:30:37,20060215
1,2,1,1,573,0.99,2005-05-28 10:35:23,20060215


In [151]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "payment_date" Column.
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_payment.payment_date = df_payment.payment_date.astype('datetime64').dt.date
df_payment = pd.merge(df_payment, df_dim_payment_date, on='payment_date', how='left')
df_payment.drop(['payment_date'], axis=1, inplace=True)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,last_update_key,payment_date_key
0,1,1,1,76,2.99,20060215,20050525
1,2,1,1,573,0.99,20060215,20050528


#### Perform Any Necessary Transformations to the DataFrames

In [152]:

# Rename the Foreign Key Columns
column_name_map = {"actor_id" : "actor_key",
                  }

df_actor.rename(columns=column_name_map, inplace=True)

# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_actor.insert(0, "fact_actor_key", range(1, df_actor.shape[0]+1))
df_actor.head(2)

Unnamed: 0,fact_actor_key,actor_key,first_name,last_name,last_update_key
0,1,14,VIVIEN,BERGEN,20060215
1,2,1,PENELOPE,GUINESS,20060215


In [153]:

# Rename the Foreign Key Columns
column_name_map = {"payment_id" : "payment_key",
                   "customer_id" : "customer_key",
                   "staff_id" : "staff_key",
                   "rental_id" : "rental_key"
                  }

df_payment.rename(columns=column_name_map, inplace=True)

# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_payment.insert(0, "fact_payment_key", range(1, df_payment.shape[0]+1))
df_payment.head(2)

Unnamed: 0,fact_payment_key,payment_key,customer_key,staff_key,rental_key,amount,last_update_key,payment_date_key
0,1,1,1,1,76,2.99,20060215,20050525
1,2,2,1,1,573,0.99,20060215,20050528


#### Load Newly Transformed MongoDB Data into the sakila_dw2 Data Warehouse

In [154]:
dataframe = df_actor
table_name = 'fact_actors'
primary_key = 'fact_actor_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [155]:
dataframe = df_payment
table_name = 'fact_payments'
primary_key = 'fact_payment_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

### Local File System

In [156]:
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'sakila_city.csv')

df_city = pd.read_csv(data_file, header=0, index_col=0)
df_city.head()

Unnamed: 0_level_0,city,country_id,last_update
city_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,A Coruña (La Coruña),87,2006-02-15 04:45:25
2,Abha,82,2006-02-15 04:45:25
3,Abu Dhabi,101,2006-02-15 04:45:25
4,Acuña,60,2006-02-15 04:45:25
5,Adana,97,2006-02-15 04:45:25


#### Perform necessary transformations

In [157]:
column_name_map = {"city_id" : "city_key",
                   "country_id" : "country_key"
                   }

df_city.rename(columns=column_name_map, inplace=True)
df_city.head(2)

df_city.insert(0, "fact_city_key", range(1, df_city.shape[0]+1))
df_city.head(2)

Unnamed: 0_level_0,fact_city_key,city,country_key,last_update
city_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1,1,A Coruña (La Coruña),87,2006-02-15 04:45:25
2,2,Abha,82,2006-02-15 04:45:25


#### Convert last_update to a last_update_key

In [158]:
df_dim_last_update = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_city.last_update = df_city.last_update.astype('datetime64').dt.date
df_city = pd.merge(df_city, df_dim_last_update, on='last_update', how='left')
df_city.drop(['last_update'], axis=1, inplace=True)
df_city.head(2)

Unnamed: 0,fact_city_key,city,country_key,last_update_key
0,1,A Coruña (La Coruña),87,20060215
1,2,Abha,82,20060215


#### Load df_city back into sakila_dw2

In [159]:
dataframe = df_city
table_name = 'fact_city'
primary_key = 'fact_city_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Validate that the New Fact Tables were Created

In [160]:
sql_actor = "SELECT * FROM sakila_dw2.fact_actors;"
df_fact_actors = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_actor)
df_fact_actors.head(2)

Unnamed: 0,fact_actor_key,actor_key,first_name,last_name,last_update_key
0,1,14,VIVIEN,BERGEN,20060215
1,2,1,PENELOPE,GUINESS,20060215


In [161]:
sql_payment = "SELECT * FROM sakila_dw2.fact_payments;"
df_fact_payments = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_payment)
df_fact_payments.head(2)

Unnamed: 0,fact_payment_key,payment_key,customer_key,staff_key,rental_key,amount,last_update_key,payment_date_key
0,1,1,1,1,76,2.99,20060215,20050525
1,2,2,1,1,573,0.99,20060215,20050528


In [162]:
sql_city = "SELECT * FROM sakila_dw2.fact_city;"
df_fact_city = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_actor)
df_fact_city.head(2)

Unnamed: 0,fact_actor_key,actor_key,first_name,last_name,last_update_key
0,1,14,VIVIEN,BERGEN,20060215
1,2,1,PENELOPE,GUINESS,20060215


### SQL Query Statements

#### View fact_payments table and sort movies that are above the average price

In [163]:
db_name = "sakila_dw2"

conn = pymysql.connect(host=host_name, user=user_id, password=pwd, database=db_name)

df = pd.read_sql("SELECT * FROM sakila_dw2.fact_payments ORDER BY amount DESC;", conn)

conn.close()
df.head(5)

  df = pd.read_sql("SELECT * FROM sakila_dw2.fact_payments ORDER BY amount DESC;", conn)


Unnamed: 0,fact_payment_key,payment_key,customer_key,staff_key,rental_key,amount,last_update_key,payment_date_key
0,342,342,13,2,8831,11.99,20060215,20050729
1,44,44,2,2,9236,10.99,20060215,20050730
2,69,69,3,2,7503,10.99,20060215,20050727
3,324,324,12,2,10392,10.99,20060215,20050801
4,550,551,21,1,3212,10.99,20060215,20050621


In [164]:
lbound = 15.00
ubound = 20.00

sql_query = """
    SELECT fact_payment_key AS payment_key
        , customer_key
        , amount
    FROM sakila_dw2.fact_payments
    WHERE amount > (SELECT AVG(amount) FROM sakila_dw2.fact_payments)
    ORDER BY amount DESC;
""".format(lbound, ubound)

print(sql_query)


    SELECT fact_payment_key AS payment_key
        , customer_key
        , amount
    FROM sakila_dw2.fact_payments
    WHERE amount > (SELECT AVG(amount) FROM sakila_dw2.fact_payments)
    ORDER BY amount DESC;



In [165]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"

sqlEngine = create_engine(conn_str, pool_recycle=3600)
conn = sqlEngine.connect()

df = pd.read_sql(sql_query, conn);

conn.close()
df.head(20)

Unnamed: 0,payment_key,customer_key,amount
0,342,13,11.99
1,44,2,10.99
2,69,3,10.99
3,324,12,10.99
4,550,21,10.99
5,792,29,10.99
6,907,33,10.99
7,5,1,9.99
8,137,5,9.99
9,224,8,9.99


#### Determine movie titles with highest replacement costs

In [166]:
db_name = "sakila_dw2"

conn = pymysql.connect(host=host_name, user=user_id, password=pwd, database=db_name)

df = pd.read_sql("SELECT title , replacement_cost FROM sakila_dw2.fact_film HAVING replacement_cost > 27.00;", conn)

conn.close()
df.head(20)

  df = pd.read_sql("SELECT title , replacement_cost FROM sakila_dw2.fact_film HAVING replacement_cost > 27.00;", conn)


Unnamed: 0,title,replacement_cost
0,AIRPLANE SIERRA,28.99
1,AIRPLANE SIERRA,28.99
2,AIRPLANE SIERRA,28.99
3,AIRPLANE SIERRA,28.99
4,AIRPLANE SIERRA,28.99
5,ALTER VICTORY,27.99
6,ALTER VICTORY,27.99
7,ALTER VICTORY,27.99
8,ALTER VICTORY,27.99
9,ANYTHING SAVANNAH,27.99


#### Determine which films are the least popular from transaction data

In [167]:
db_name = "sakila_dw2"

conn = pymysql.connect(host=host_name, user=user_id, password=pwd, database=db_name)

df = pd.read_sql("SELECT title, COUNT(*) as times_purchased FROM sakila_dw2.fact_film GROUP BY title HAVING times_purchased < 5 ORDER BY times_purchased ASC;", conn)

conn.close()
df.head(20)

  df = pd.read_sql("SELECT title, COUNT(*) as times_purchased FROM sakila_dw2.fact_film GROUP BY title HAVING times_purchased < 5 ORDER BY times_purchased ASC;", conn)


Unnamed: 0,title,times_purchased
0,BAKED CLEOPATRA,1
1,BRIDE INTRIGUE,1
2,DOLLS RAGE,1
3,DRUMLINE CYCLONE,1
4,DWARFS ALTER,1
5,FERRIS MOTHER,1
6,FLIGHT LIES,1
7,FOREVER CANDIDATE,1
8,GHOSTBUSTERS ELF,1
9,GIANT TROOPERS,1


#### Calculate total inventory for each store using COUNT for each store id

In [168]:
db_name = "sakila_dw2"

conn = pymysql.connect(host=host_name, user=user_id, password=pwd, database=db_name)

df = pd.read_sql("SELECT store_id, COUNT(*) AS total_inventory FROM sakila_dw2.dim_inventory GROUP BY store_id;", conn)

conn.close()
df.head(20)

  df = pd.read_sql("SELECT store_id, COUNT(*) AS total_inventory FROM sakila_dw2.dim_inventory GROUP BY store_id;", conn)


Unnamed: 0,store_id,total_inventory
0,1,2270
1,2,2311
