## Data Science 2002: Project 1/ Midterm (Brooke Livergood)

### ETL Data Processor

You project should demonstrate your understanding of the differing types of data systems (OLTP/OLAP),
and how data can be extracted from various source systems (structured, semi-structured, unstructured),
transformed (cleansed, integrated), and then loaded into a destination system that’s optimized for post
hoc diagnostic analysis.

#### Deliverable:
1. Design a dimensional data mart that represents a simple business process of your choosing.
a. Examples might include retail sales, inventory management, procurement, order
management, transportation or hospitality bookings, medical appointments, student
registration and/or attendance.
b. You may select any business process that interests you, but remember that a
dimensional data mart provides for the post hoc summarization and historic analysis of
business transactions that reflect the interaction between various entities (e.g., patients
& doctors, retailers & customers, students & schools/classes, travelers & airlines/hotels).
2. Develop an ETL pipeline that extracts, transforms, and loads data into your data mart.
a. Extract data from one or more SQL database tables; hosted locally or in the Cloud.
b. Retrieve a data file, either from a remote or local file system, converting its original
format (e.g., CSV, JSON) into a SQL database table.
c. Modify the number of columns from each source to the destination.
3. Author one or more SQL queries (SELECT statements) to demonstrate proper functionality.
a. SELECT data from at least 3 tables (the fact table plus two dimensions).
b. Perform some type of aggregation (e.g., SUM, COUNT, AVERAGE). This, of course,
necessitates some form of grouping operation (e.g., GROUP BY <customer.last_name>).

#### Requirements:
Your solution (database schema) needn’t be complex, but should meet the following requirements:
* Your solution must include a Date dimension to enable the analysis of the business process over
various intervals of time (the code for creating this in MySQL has already been provided for you).
* Your solution must include at least 2 additional dimension tables (e.g., buyers, sellers, products)
* Your solution must include at least 1 fact table that models the business process
* Your solution must use data originating from the 3 of the following sources:
    * A relational database like MySQL or SQL Server.
    * A NoSQL database like MongoDB or Azure Cosmos DB.
    * A file system (either local or remote).
    * An API that returns a message payload (e.g., JSON, CSV, text) optional.

#### Benchmarks:
1. You must submit all data used to populate the data mart (source databases, JSON/CSV files, etc.)
2. You must submit all Python code needed to implement data integration, and any object creation.

Please submit all code, and other artifacts, in a standalone GitHub repository in your account. If you opt to use any cloud-hosted services then please identify them so we may faithfully replicate your project.

#### Grading:
* Successful deployment – 40%.
* Functionality that meets all benchmarks – 50%.
* Documentation – Describe your process, code, deployment strategy – 10%.

#### Import the Necessary Libraries

In [749]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

#### Declare & Assign Connection Variables for the MySQL Server & Databases with which You'll be Working

In [751]:
#mySQL
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "M1ssb3lle76!"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

#mongo_db
mongodb_args = {
    "user_name" : "blive",
    "password" : "M1ssb3lle76!",
    "cluster_name" : "sandbox",
    "cluster_subnet" : "40lw0",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila_customers"}

#### Define Functions for Getting Data From and Setting Data Into Databases

In [753]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    # creates a new table in mySQL using pandas
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        # if table exists, replace it
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
        # code in youtube video says sqlEngine.execute and not connection.execute
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### Create the New Data Warehouse database, and to Use it, Switch the Connection Context.

In [755]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

#myEngine.execute(text(f"DROP DATABASE IF EXISTS `{dst_dbname}`;"))
connection.execute(text(f"DROP DATABASE IF EXISTS `{dst_dbname}`;"))
# connection is replaced with myEngine.execute in video
connection.execute(text(f"CREATE DATABASE `{dst_dbname}`;"))
connection.execute(text(f"USE {dst_dbname};"))

connection.close()

In [756]:
print(conn_str)

mysql+pymysql://root:M1ssb3lle76!@localhost


#### Pulling Data from MondoDB

In [758]:
#customers

client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.getcwd()

json_files = {"customer" : 'sakila_customer.json'}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files) 

In [759]:
client = get_mongo_client(**mongodb_args)

query = {}
collection = "customer"

df_customer = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


#### Pulling Data from CSV file

In [761]:
#payment
df_payment = pd.read_csv('payment.csv', sep=';')
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


#### Extract Data from mySQL

In [763]:
#customers
sql_customers = "SELECT * FROM sakila.customer;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [764]:
#category
sql_category = "SELECT * FROM sakila.category;"
df_category = get_dataframe(user_id, pwd, host_name, src_dbname, sql_category)
df_category.head(2)

Unnamed: 0,category_id,name,last_update
0,1,Action,2006-02-15 04:46:27
1,2,Animation,2006-02-15 04:46:27


In [765]:
#film
sql_film = "SELECT * FROM sakila.film;"
df_film = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [766]:
#actor
sql_actor = "SELECT * FROM sakila.actor;"
df_actor = get_dataframe(user_id, pwd, host_name, src_dbname, sql_actor)
df_actor.head(2)

Unnamed: 0,actor_id,first_name,last_name,last_update
0,1,PENELOPE,GUINESS,2006-02-15 04:34:33
1,2,NICK,WAHLBERG,2006-02-15 04:34:33


In [767]:
#film actor
sql_film_actor = "SELECT * FROM sakila.film_actor;"
df_film_actor = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film_actor)
df_film_actor.head(2)

Unnamed: 0,actor_id,film_id,last_update
0,1,1,2006-02-15 05:05:03
1,1,23,2006-02-15 05:05:03


In [768]:
#film category
sql_film_category = "SELECT * FROM sakila.film_category;"
df_film_category = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film_category)
df_film_category.head(2)

Unnamed: 0,film_id,category_id,last_update
0,1,6,2006-02-15 05:07:09
1,2,11,2006-02-15 05:07:09


In [769]:
#inventory
sql_inventory = "SELECT * FROM sakila.inventory;"
df_inventory = get_dataframe(user_id, pwd, host_name, src_dbname, sql_inventory)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


In [770]:
#rental
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [771]:
#store
sql_store = "SELECT * FROM sakila.store;"
df_store = get_dataframe(user_id, pwd, host_name, src_dbname, sql_store)
df_store.head(2)

Unnamed: 0,store_id,manager_staff_id,address_id,last_update
0,1,1,1,2006-02-15 04:57:12
1,2,2,2,2006-02-15 04:57:12


In [789]:
#address
sql_address = "SELECT * FROM sakila.address;"
df_address = get_dataframe(user_id, pwd, host_name, src_dbname, sql_address)
df_address.head(2)

Unnamed: 0,address_id,address,address2,district,city_id,postal_code,phone,location,last_update
0,1,47 MySakila Drive,,Alberta,300,,,b'\x00\x00\x00\x00\x01\x01\x00\x00\x00>\n2]c4\...,2014-09-25 22:30:27
1,2,28 MySQL Boulevard,,QLD,576,,,b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\x8e\x10...,2014-09-25 22:30:09


#### Create Date Dimension Table

Used the code given in Lab 2c. 
USE sakila_dw;

#### Performing any necessary transformations

In [791]:
# Customers
drop_cols = ['email','active', 'last_update']
df_customers.drop(drop_cols, axis=1, inplace=True)
df_customers.insert(0, "customer_key", range(1, df_customers.shape[0]+1))

df_customers.head(2)

Unnamed: 0,customer_key,customer_id,store_id,first_name,last_name,address_id,create_date
0,1,1,1,MARY,SMITH,5,2006-02-14 22:04:36
1,2,2,1,PATRICIA,JOHNSON,6,2006-02-14 22:04:36


In [793]:
#payment
drop_cols = ['staff_id', 'last_update']
df_payment.drop(drop_cols, axis=1, inplace=True)
df_payment.insert(0, "payment_key", range(1, df_payment.shape[0]+1))

df_payment.head(2)

Unnamed: 0,payment_key,payment_id,customer_id,rental_id,amount,payment_date
0,1,1,1,76,2.99,2005-05-25 11:30:37
1,2,2,1,573,0.99,2005-05-28 10:35:23


In [795]:
#category
drop_cols = ['last_update']
df_category.drop(drop_cols, axis=1, inplace=True)
df_category.insert(0, "category_key", range(1, df_category.shape[0]+1))

df_category.head(2)

Unnamed: 0,category_key,category_id,name
0,1,1,Action
1,2,2,Animation


In [797]:
#film
drop_cols = ['language_id', 'original_language_id','last_update']
df_film.drop(drop_cols, axis=1, inplace=True)
df_film.insert(0, "film_key", range(1, df_film.shape[0]+1))

df_film.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"


In [799]:
#actor
drop_cols = ['last_update']
df_actor.drop(drop_cols, axis=1, inplace=True)
df_actor.insert(0, "actor_key", range(1, df_actor.shape[0]+1))

df_actor.head(2)

Unnamed: 0,actor_key,actor_id,first_name,last_name
0,1,1,PENELOPE,GUINESS
1,2,2,NICK,WAHLBERG


In [801]:
#film actor
drop_cols = ['last_update']
df_film_actor.drop(drop_cols, axis=1, inplace=True)
df_film_actor.insert(0, "film_actor_key", range(1, df_film_actor.shape[0]+1))

df_film_actor.head(2)

Unnamed: 0,film_actor_key,actor_id,film_id
0,1,1,1
1,2,1,23


In [803]:
#film category
drop_cols = ['last_update']
df_film_category.drop(drop_cols, axis=1, inplace=True)
df_film_category.insert(0, "film_category_key", range(1, df_film_category.shape[0]+1))

df_film_category.head(2)

Unnamed: 0,film_category_key,film_id,category_id
0,1,1,6
1,2,2,11


In [805]:
#inventory
drop_cols = ['last_update']
df_inventory.drop(drop_cols, axis=1, inplace=True)
df_inventory.insert(0, "inventory_key", range(1, df_inventory.shape[0]+1))

df_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id,store_id
0,1,1,1,1
1,2,2,1,1


In [807]:
#rental
drop_cols = ['staff_id','last_update']
df_rental.drop(drop_cols, axis=1, inplace=True)
df_rental.insert(0, "rental_key", range(1, df_rental.shape[0]+1))

df_rental.head(2)

Unnamed: 0,rental_key,rental_id,rental_date,inventory_id,customer_id,return_date
0,1,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30
1,2,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33


In [809]:
#store
drop_cols = ['manager_staff_id','last_update']
df_store.drop(drop_cols, axis=1, inplace=True)
df_store.insert(0, "store_key", range(1, df_store.shape[0]+1))

df_store.head(2)

Unnamed: 0,store_key,store_id,address_id
0,1,1,1
1,2,2,2


In [811]:
#address
drop_cols = ['address2', 'location','last_update']
df_address.drop(drop_cols, axis=1, inplace=True)
df_address.insert(0, "address_key", range(1, df_address.shape[0]+1))

df_address.head(2)

Unnamed: 0,address_key,address_id,address,district,city_id,postal_code,phone
0,1,1,47 MySakila Drive,Alberta,300,,
1,2,2,28 MySQL Boulevard,QLD,576,,


#### Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [814]:
df_payment.head(2)

Unnamed: 0,payment_key,payment_id,customer_id,rental_id,amount,payment_date
0,1,1,1,76,2.99,2005-05-25 11:30:37
1,2,2,1,573,0.99,2005-05-28 10:35:23


In [816]:
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_payment', df_payment, 'payment_key'),
          ('dim_category', df_category, 'category_key'),
          ('dim_film', df_film, 'film_key'),
          ('dim_actor', df_actor, 'actor_key'),
          ('dim_film_actor', df_film_actor, 'film_actor_key'),
          ('dim_film_category', df_film_category, 'film_category_key'),
          ('dim_inventory', df_inventory, 'inventory_key'),
          ('dim_rental', df_rental, 'rental_key'),
          ('dim_store', df_store, 'store_key'),
          ('dim_address', df_address, 'address_key')]

In [818]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### create and populate fact table

making a fact table for the data surrounding film rentals: store key, store address, rental (rental key, rental date and return date, customer id), payment (payment key, customer id, rental id, price, date), inventory (inventory key, film id, store id), film (film key, title, rental duration, rental rate, replacement cost), customer(first name, last name, customer key, store id)

make a fact table for film, actor, film actor, film category

In [821]:
# merge on inventory id to combine rental and inventory
df_rental = pd.merge(df_rental, df_inventory, on='inventory_id', how='inner')
df_rental.rename(columns={"store_id":"inventory_store_id"}, inplace=True)
df_rental.drop(['inventory_id'], axis=1, inplace=True)
df_rental.head(2)

Unnamed: 0,rental_key,rental_id,rental_date,customer_id,return_date,inventory_key,film_id,inventory_store_id
0,1,1,2005-05-24 22:53:30,130,2005-05-26 22:04:30,367,80,1
1,2,2,2005-05-24 22:54:33,459,2005-05-28 19:40:33,1525,333,2


In [823]:
# merge on customer_id to combine payment and customer
df_payment = pd.merge(df_payment, df_customers, on='customer_id', how='inner')
df_payment.rename(columns={"store_id":"customer_store_id"}, inplace=True)
df_payment.drop(['customer_id'], axis=1, inplace=True)
df_payment.head(2)

Unnamed: 0,payment_key,payment_id,rental_id,amount,payment_date,customer_key,customer_store_id,first_name,last_name,address_id,create_date
0,1,1,76,2.99,2005-05-25 11:30:37,1,1,MARY,SMITH,5,2006-02-14 22:04:36
1,2,2,573,0.99,2005-05-28 10:35:23,1,1,MARY,SMITH,5,2006-02-14 22:04:36


#### Join DataFrames

In [826]:
df_fact_films = df_rental = pd.merge(df_payment, df_rental, on='rental_id', how='left')
df_fact_films.head(5)

Unnamed: 0,payment_key,payment_id,rental_id,amount,payment_date,customer_key,customer_store_id,first_name,last_name,address_id,create_date,rental_key,rental_date,customer_id,return_date,inventory_key,film_id,inventory_store_id
0,1,1,76,2.99,2005-05-25 11:30:37,1,1,MARY,SMITH,5,2006-02-14 22:04:36,76,2005-05-25 11:30:37,1,2005-06-03 12:00:37,3021,663,2
1,2,2,573,0.99,2005-05-28 10:35:23,1,1,MARY,SMITH,5,2006-02-14 22:04:36,572,2005-05-28 10:35:23,1,2005-06-03 06:32:23,4020,875,2
2,3,3,1185,5.99,2005-06-15 00:54:12,1,1,MARY,SMITH,5,2006-02-14 22:04:36,1184,2005-06-15 00:54:12,1,2005-06-23 02:42:12,2785,611,1
3,4,4,1422,0.99,2005-06-15 18:02:53,1,1,MARY,SMITH,5,2006-02-14 22:04:36,1421,2005-06-15 18:02:53,1,2005-06-19 15:54:53,1021,228,2
4,5,5,1476,9.99,2005-06-15 21:08:46,1,1,MARY,SMITH,5,2006-02-14 22:04:36,1475,2005-06-15 21:08:46,1,2005-06-25 02:26:46,1407,308,1


In [828]:
df_fact_films.shape

(16044, 18)

#### lookup primary keys from dim tables

In [831]:
#store
sql_dim_store = "SELECT store_key, store_id FROM sakila_dw.dim_store;"
df_dim_store = get_dataframe(user_id, pwd, host_name, src_dbname,sql_dim_store)
df_dim_store.head(2)

Unnamed: 0,store_key,store_id
0,1,1
1,2,2


In [833]:
#film
sql_dim_film = "SELECT film_key, film_id FROM sakila_dw.dim_film;"
df_dim_film = get_dataframe(user_id, pwd, host_name, src_dbname,sql_dim_film)
df_dim_film.head(2)

Unnamed: 0,film_key,film_id
0,1,1
1,2,2


In [835]:
#address
sql_dim_address = "SELECT address_key, address_id FROM sakila_dw.dim_address;"
df_dim_address = get_dataframe(user_id, pwd, host_name, src_dbname,sql_dim_address)
df_dim_address.head(2)

Unnamed: 0,address_key,address_id
0,1,1
1,2,2


#### use business keys to look up corresponding surrogate primary key vaues in dim tables

In [838]:
df_fact_films.head(5)

Unnamed: 0,payment_key,payment_id,rental_id,amount,payment_date,customer_key,customer_store_id,first_name,last_name,address_id,create_date,rental_key,rental_date,customer_id,return_date,inventory_key,film_id,inventory_store_id
0,1,1,76,2.99,2005-05-25 11:30:37,1,1,MARY,SMITH,5,2006-02-14 22:04:36,76,2005-05-25 11:30:37,1,2005-06-03 12:00:37,3021,663,2
1,2,2,573,0.99,2005-05-28 10:35:23,1,1,MARY,SMITH,5,2006-02-14 22:04:36,572,2005-05-28 10:35:23,1,2005-06-03 06:32:23,4020,875,2
2,3,3,1185,5.99,2005-06-15 00:54:12,1,1,MARY,SMITH,5,2006-02-14 22:04:36,1184,2005-06-15 00:54:12,1,2005-06-23 02:42:12,2785,611,1
3,4,4,1422,0.99,2005-06-15 18:02:53,1,1,MARY,SMITH,5,2006-02-14 22:04:36,1421,2005-06-15 18:02:53,1,2005-06-19 15:54:53,1021,228,2
4,5,5,1476,9.99,2005-06-15 21:08:46,1,1,MARY,SMITH,5,2006-02-14 22:04:36,1475,2005-06-15 21:08:46,1,2005-06-25 02:26:46,1407,308,1


In [840]:
#film
df_fact_films = pd.merge(df_fact_films, df_dim_film, on='film_id', how='inner')
df_fact_films.drop(['film_id'], axis=1, inplace=True)
df_fact_films.head(2)

Unnamed: 0,payment_key,payment_id,rental_id,amount,payment_date,customer_key,customer_store_id,first_name,last_name,address_id,create_date,rental_key,rental_date,customer_id,return_date,inventory_key,inventory_store_id,film_key
0,1,1,76,2.99,2005-05-25 11:30:37,1,1,MARY,SMITH,5,2006-02-14 22:04:36,76,2005-05-25 11:30:37,1,2005-06-03 12:00:37,3021,2,663
1,2,2,573,0.99,2005-05-28 10:35:23,1,1,MARY,SMITH,5,2006-02-14 22:04:36,572,2005-05-28 10:35:23,1,2005-06-03 06:32:23,4020,2,875


In [842]:
#address
df_fact_films = pd.merge(df_fact_films, df_dim_address, on='address_id', how='inner')
df_fact_films.drop(['address_id'], axis=1, inplace=True)
df_fact_films.head(2)

Unnamed: 0,payment_key,payment_id,rental_id,amount,payment_date,customer_key,customer_store_id,first_name,last_name,create_date,rental_key,rental_date,customer_id,return_date,inventory_key,inventory_store_id,film_key,address_key
0,1,1,76,2.99,2005-05-25 11:30:37,1,1,MARY,SMITH,2006-02-14 22:04:36,76,2005-05-25 11:30:37,1,2005-06-03 12:00:37,3021,2,663,5
1,2,2,573,0.99,2005-05-28 10:35:23,1,1,MARY,SMITH,2006-02-14 22:04:36,572,2005-05-28 10:35:23,1,2005-06-03 06:32:23,4020,2,875,5


#### lookup date keys

In [845]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [847]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "payment_date" Column.

df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_fact_films.payment_date = df_fact_films.payment_date.astype('datetime64[ns]').dt.date

df_fact_films = pd.merge(df_fact_films, df_dim_payment_date, on='payment_date', how='left')
df_fact_films.drop(['payment_date'], axis=1, inplace=True)
df_fact_films.head(2)

Unnamed: 0,payment_key,payment_id,rental_id,amount,customer_key,customer_store_id,first_name,last_name,create_date,rental_key,rental_date,customer_id,return_date,inventory_key,inventory_store_id,film_key,address_key,payment_date_key
0,1,1,76,2.99,1,1,MARY,SMITH,2006-02-14 22:04:36,76,2005-05-25 11:30:37,1,2005-06-03 12:00:37,3021,2,663,5,20050525
1,2,2,573,0.99,1,1,MARY,SMITH,2006-02-14 22:04:36,572,2005-05-28 10:35:23,1,2005-06-03 06:32:23,4020,2,875,5,20050528


In [849]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "rental_date" Column.

df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_films.rental_date = df_fact_films.rental_date.astype('datetime64[ns]').dt.date

df_fact_films = pd.merge(df_fact_films, df_dim_rental_date, on='rental_date', how='left')
df_fact_films.drop(['rental_date'], axis=1, inplace=True)
df_fact_films.head(2)

Unnamed: 0,payment_key,payment_id,rental_id,amount,customer_key,customer_store_id,first_name,last_name,create_date,rental_key,customer_id,return_date,inventory_key,inventory_store_id,film_key,address_key,payment_date_key,rental_date_key
0,1,1,76,2.99,1,1,MARY,SMITH,2006-02-14 22:04:36,76,1,2005-06-03 12:00:37,3021,2,663,5,20050525,20050525
1,2,2,573,0.99,1,1,MARY,SMITH,2006-02-14 22:04:36,572,1,2005-06-03 06:32:23,4020,2,875,5,20050528,20050528


In [851]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "return_date" Column.

df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_films.return_date = df_fact_films.return_date.astype('datetime64[ns]').dt.date

df_fact_films = pd.merge(df_fact_films, df_dim_return_date, on='return_date', how='left')
df_fact_films.drop(['return_date'], axis=1, inplace=True)
df_fact_films.head(2)

Unnamed: 0,payment_key,payment_id,rental_id,amount,customer_key,customer_store_id,first_name,last_name,create_date,rental_key,customer_id,inventory_key,inventory_store_id,film_key,address_key,payment_date_key,rental_date_key,return_date_key
0,1,1,76,2.99,1,1,MARY,SMITH,2006-02-14 22:04:36,76,1,3021,2,663,5,20050525,20050525,20050603.0
1,2,2,573,0.99,1,1,MARY,SMITH,2006-02-14 22:04:36,572,1,4020,2,875,5,20050528,20050528,20050603.0


#### perform any additional transformations

In [857]:
# drop columns of no interest
drop_columns = ['payment_id', 'rental_id', 'create_date', 'customer_store_id','customer_id']
df_fact_films.drop(drop_columns,axis=1,inplace=True)

# reorder remaining columns
ordered_columns = ['film_key','payment_key', 'amount', 'payment_date_key','customer_key', 'first_name',
                   'last_name', 'email','address', 'active', 'rental_key','rental_date_key','return_date_key',
                   'inventory_key', 'inventory_store_id']

# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_fact_films.insert(0,"fact_film_key", range(1, df_fact_films.shape[0]+1))
df_fact_films.head(2)

Unnamed: 0,fact_film_key,payment_key,amount,customer_key,first_name,last_name,rental_key,inventory_key,inventory_store_id,film_key,address_key,payment_date_key,rental_date_key,return_date_key
0,1,1,2.99,1,MARY,SMITH,76,3021,2,663,5,20050525,20050525,20050603.0
1,2,2,0.99,1,MARY,SMITH,572,4020,2,875,5,20050528,20050528,20050603.0


#### write the data frame back to the database

In [860]:
table_name = "fact_films"
primary_key = "fact_film_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_films, table_name, primary_key, db_operation)

#### demonstrate that the new data warehouse exists and contains the correct data

In [900]:
sql_test = """
    SELECT customers.`last_name` AS `customer_name`,
    	SUM(films.`amount`) AS `total_amount_spent`,
        COUNT(rental.`rental_key`) AS `total_rentals`
    FROM `{0}`.`fact_films` AS films
    JOIN `{0}`.`dim_customers` AS customers
    ON films.customer_key = customers.customer_key
    JOIN `{0}`.`dim_rental` AS rental
    ON films.rental_key = rental.rental_key
    GROUP BY customers.`last_name`
    ORDER BY total_amount_spent DESC;
""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)
df_test.head()

Unnamed: 0,customer_name,total_amount_spent,total_rentals
0,SEAL,221.55,45
1,HUNT,216.54,46
2,SHAW,195.58,42
3,KENNEDY,194.61,39
4,SNYDER,194.61,39
