## DS 2002 Capstone Project - Alysha Akhtar (yhc8vx)

This project was created using the sakila database. Initially, the sakila-schema.sql and the sakila-data.sql files were run to create the original sakila database in MySQL.

The midterm project scripts to create the fact table and 3 dimension tables in sakila_dm in MySQL were run.

### Section I: Prerequisites

#### Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "yhc8vx-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dm"

connection_properties = {
  "user" : "yhc8vx",
  "password" : "Passw0rd123!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.ks1ubfx"
atlas_database_name = "stakila"
atlas_user_name = "yhc8vx"
atlas_password = "Passw0rd123"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/capstone_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/movierentals"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rentals_stream_dir = f"{stream_dir}/rentals"

rentals_output_bronze = f"{database_dir}/fact_rentals/bronze"
rentals_output_silver = f"{database_dir}/fact_rentals/silver"
rentals_output_gold   = f"{database_dir}/fact_rentals/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rentals", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[260]: True

#### Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### Fetch Reference Data From an Azure MySQL Database
##### Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Capstone Database"
LOCATION "dbfs:/FileStore/capstone_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone");

##### Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://yhc8vx-mysql.mysql.database.azure.com:3306/sakila_dm", --Replace with your Server Name
  dbtable "dim_date",
  user "yhc8vx",    --Replace with your User Name
  password "Passw0rd123!"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/capstone_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### Create a New Table that Sources Customer Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_customer" that extracts data from your MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://yhc8vx-mysql.mysql.database.azure.com:3306/sakila_dm", --Replace with your Server Name
  dbtable "dim_customer",
  user "yhc8vx",    --Replace with your User Name
  password "Passw0rd123!"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_customer" using data from the view named "view_product"
CREATE OR REPLACE TABLE sakila_dlh.dim_customer
COMMENT "Customer Dimension Table"
LOCATION "dbfs:/FileStore/capstone_data/sakila_dlh/dim_customer"
AS SELECT * FROM view_customer

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
customer_key,int,
first_name,string,
last_name,string,
email,string,
active,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_customer,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_key,first_name,last_name,email,active
1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1
2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1
3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,1
4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,1
5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,1


#### Fetch Reference Data from a MongoDB Atlas Database
##### View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/capstone_data/movierentals/batch'

path,name,size,modificationTime
dbfs:/FileStore/capstone_data/movierentals/batch/Sakila_DimFilm.json,Sakila_DimFilm.json,197638,1701746827000
dbfs:/FileStore/capstone_data/movierentals/batch/Sakila_DimStaff.csv,Sakila_DimStaff.csv,198,1701746827000


##### Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/capstone_data/movierentals/batch'
json_files = {"films" : 'Sakila_DimFilm.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[273]: <pymongo.results.InsertManyResult at 0x7f2209aa9b00>

##### Fetch Film Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "yhc8vx"
val pwd = "Passw0rd123"
val clusterName = "cluster0.ks1ubfx"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_film = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "stakila")
.option("collection", "films").load()
.select("film_key","title","release_year","rental_duration","rental_rate","length","replacement_cost","rating")


In [0]:
%scala
df_film.printSchema()

##### Use the Spark DataFrame to Create a New Film Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_film.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_film")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film

col_name,data_type,comment
film_key,int,
title,string,
release_year,int,
rental_duration,int,
rental_rate,double,
length,int,
replacement_cost,double,
rating,string,
,,
# Detailed Table Information,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5

film_key,title,release_year,rental_duration,rental_rate,length,replacement_cost,rating
1,ACADEMY DINOSAUR,2006,6,0.99,86,20.99,PG
2,ACE GOLDFINGER,2006,3,4.99,48,12.99,G
3,ADAPTATION HOLES,2006,7,2.99,50,18.99,NC-17
4,AFFAIR PREJUDICE,2006,5,2.99,117,26.99,G
5,AFRICAN EGG,2006,6,2.99,130,22.99,G


#### Fetch Data from a File System
##### Use PySpark to Read From a CSV File

In [0]:
staff_csv = f"{batch_dir}/Sakila_DimStaff.csv"

df_staff = spark.read.format('csv').options(header='true', inferSchema='true').load(staff_csv)
display(df_staff)

staff_key,first_name,last_name,email,username,password
1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964
2,Jon,Stephens,Jon.Stephens@sakilastaff.com,Jon,


In [0]:
df_staff.printSchema()

root
 |-- staff_key: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- username: string (nullable = true)
 |-- password: string (nullable = true)



In [0]:
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff;

col_name,data_type,comment
staff_key,int,
first_name,string,
last_name,string,
email,string,
username,string,
password,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 5;

staff_key,first_name,last_name,email,username,password
1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964
2,Jon,Stephens,Jon.Stephens@sakilastaff.com,Jon,


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_film,False
sakila_dlh,dim_staff,False
,display_query_1,True
,display_query_10,True
,display_query_11,True
,display_query_12,True
,display_query_13,True
,display_query_14,True


### Section III: Integrate Reference Data with Real-Time Data
#### Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_rental_key BIGINT")
 .option("cloudFiles.schemaHints", "rental_key BIGINT")
 .option("cloudFiles.schemaHints", "customer_key BIGINT")
 .option("cloudFiles.schemaHints", "film_key BIGINT") 
 .option("cloudFiles.schemaHints", "staff_key BIGINT")
 .option("cloudFiles.schemaHints", "rental_date_key DECMINAL")
 .option("cloudFiles.schemaHints", "return_date_key DECIMAL")
 .option("cloudFiles.schemaHints", "amount DOUBLE")
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rentals_stream_dir)
 .createOrReplaceTempView("rentals_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview
LIMIT 20

amount,customer_key,fact_rental_key,film_key,rental_date_key,rental_key,return_date_key,staff_key,_rescued_data,receipt_time,source_file
0.99,418,10697,423,20050820,13970,20050822,2,,2023-12-06T05:34:28.418+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
2.99,494,10698,447,20050706,3803,20050708,1,,2023-12-06T05:34:28.418+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
4.99,469,10699,447,20050801,10258,20050808,2,,2023-12-06T05:34:28.418+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
5.99,403,10700,447,20050817,12005,20050825,2,,2023-12-06T05:34:28.418+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
7.99,126,10701,117,20050706,3804,20050715,2,,2023-12-06T05:34:28.418+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
4.99,347,10702,117,20050728,8148,20050802,1,,2023-12-06T05:34:28.418+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
4.99,247,10703,117,20050818,12265,20050820,1,,2023-12-06T05:34:28.418+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
0.99,418,10704,374,20050706,3805,20050707,1,,2023-12-06T05:34:28.418+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
0.99,434,10705,374,20050727,6972,20050728,2,,2023-12-06T05:34:28.418+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
2.99,580,10706,374,20050818,12670,20050823,2,,2023-12-06T05:34:28.418+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

Out[285]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f21ce0050a0>

##### Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview
LIMIT 20

amount,customer_key,fact_rental_key,film_key,rental_date_key,rental_key,return_date_key,staff_key,_rescued_data,receipt_time,source_file
0.99,418,10697,423,20050820,13970,20050822,2,,2023-12-06T04:47:04.482+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
2.99,494,10698,447,20050706,3803,20050708,1,,2023-12-06T04:47:04.482+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
4.99,469,10699,447,20050801,10258,20050808,2,,2023-12-06T04:47:04.482+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
5.99,403,10700,447,20050817,12005,20050825,2,,2023-12-06T04:47:04.482+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
7.99,126,10701,117,20050706,3804,20050715,2,,2023-12-06T04:47:04.482+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
4.99,347,10702,117,20050728,8148,20050802,1,,2023-12-06T04:47:04.482+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
4.99,247,10703,117,20050818,12265,20050820,1,,2023-12-06T04:47:04.482+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
0.99,418,10704,374,20050706,3805,20050707,1,,2023-12-06T04:47:04.482+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
0.99,434,10705,374,20050727,6972,20050728,2,,2023-12-06T04:47:04.482+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json
2.99,580,10706,374,20050818,12670,20050823,2,,2023-12-06T04:47:04.482+0000,dbfs:/FileStore/capstone_data/movierentals/stream/rentals/Sakila_Fact_Rentals03.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
amount,double,
customer_key,bigint,
fact_rental_key,bigint,
film_key,bigint,
rental_date_key,bigint,
rental_key,bigint,
return_date_key,bigint,
staff_key,bigint,
_rescued_data,string,
receipt_time,timestamp,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT r.fact_rental_key,
      r.rental_key,

      r.customer_key,
      c.first_name AS customer_first_name,
      c.last_name AS customer_last_name,
      c.email AS customer_email,
      c.active AS cutomer_active_status,

      r.film_key,
      f.title AS film_title,
      f.release_year AS film_release_year,
      f.rental_duration AS film_rental_duration,
      f.rental_rate AS film_rental_ratw,
      f.length AS film_length,
      f.replacement_cost AS film_replacement_cost,
      f.rating AS film_rating,

      r.staff_key,
      s.first_name AS staff_first_name,
      s.last_name AS staff_last_name,
      s.email AS staff_email,

      r.rental_date_key,
      rld.day_name_of_week AS rental_day_name_of_week,
      rld.day_of_month AS rental_day_of_month,
      rld.weekday_weekend AS rental_weekday_weekend,
      rld.month_name AS rental_month_name,
      rld.calendar_quarter AS rental_quarter,
      rld.calendar_year AS rental_year,

      r.return_date_key,
      rnd.day_name_of_week AS return_day_name_of_week,
      rnd.day_of_month AS return_day_of_month,
      rnd.weekday_weekend AS return_weekday_weekend,
      rnd.month_name AS return_month_name,
      rnd.calendar_quarter AS return_quarter,
      rnd.calendar_year AS return_year,

      r.amount AS payment_amount

  FROM rentals_silver_tempview AS r
  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_key = r.customer_key
  INNER JOIN sakila_dlh.dim_film AS f
  ON f.film_key = r.film_key
  INNER JOIN sakila_dlh.dim_staff AS s
  ON s.staff_key = r.staff_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS rld
  ON rld.date_key = r.rental_date_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS rnd
  ON rnd.date_key = r.return_date_key
)

In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

Out[290]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f21ce0053a0>

In [0]:
%sql
SELECT * FROM fact_rentals_silver
LIMIT 20

fact_rental_key,rental_key,customer_key,customer_first_name,customer_last_name,customer_email,cutomer_active_status,film_key,film_title,film_release_year,film_rental_duration,film_rental_ratw,film_length,film_replacement_cost,film_rating,staff_key,staff_first_name,staff_last_name,staff_email,rental_date_key,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_quarter,rental_year,return_date_key,return_day_name_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_quarter,return_year,payment_amount
10697,13970,418,JEFF,EAST,JEFF.EAST@sakilacustomer.org,1,423,HOLLYWOOD ANONYMOUS,2006,7,0.99,69,29.99,PG,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,20050820,Saturday,20,Weekend,August,3,2005,20050822,Monday,22,Weekday,August,3,2005,0.99
10698,3803,494,RAMON,CHOATE,RAMON.CHOATE@sakilacustomer.org,1,447,ICE CROSSING,2006,5,2.99,131,28.99,R,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,20050706,Wednesday,6,Weekday,July,3,2005,20050708,Friday,8,Weekday,July,3,2005,2.99
10699,10258,469,WESLEY,BULL,WESLEY.BULL@sakilacustomer.org,1,447,ICE CROSSING,2006,5,2.99,131,28.99,R,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,20050801,Monday,1,Weekday,August,3,2005,20050808,Monday,8,Weekday,August,3,2005,4.99
10700,12005,403,MIKE,WAY,MIKE.WAY@sakilacustomer.org,1,447,ICE CROSSING,2006,5,2.99,131,28.99,R,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,20050817,Wednesday,17,Weekday,August,3,2005,20050825,Thursday,25,Weekday,August,3,2005,5.99
10701,3804,126,ELLEN,SIMPSON,ELLEN.SIMPSON@sakilacustomer.org,1,117,CANDLES GRAPES,2006,6,4.99,135,15.99,NC-17,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,20050706,Wednesday,6,Weekday,July,3,2005,20050715,Friday,15,Weekday,July,3,2005,7.99
10702,8148,347,RYAN,SALISBURY,RYAN.SALISBURY@sakilacustomer.org,1,117,CANDLES GRAPES,2006,6,4.99,135,15.99,NC-17,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,20050728,Thursday,28,Weekday,July,3,2005,20050802,Tuesday,2,Weekday,August,3,2005,4.99
10703,12265,247,STELLA,MORENO,STELLA.MORENO@sakilacustomer.org,1,117,CANDLES GRAPES,2006,6,4.99,135,15.99,NC-17,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,20050818,Thursday,18,Weekday,August,3,2005,20050820,Saturday,20,Weekend,August,3,2005,4.99
10704,3805,418,JEFF,EAST,JEFF.EAST@sakilacustomer.org,1,374,GRAFFITI LOVE,2006,3,0.99,117,29.99,PG,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,20050706,Wednesday,6,Weekday,July,3,2005,20050707,Thursday,7,Weekday,July,3,2005,0.99
10705,6972,434,EDDIE,TOMLIN,EDDIE.TOMLIN@sakilacustomer.org,1,374,GRAFFITI LOVE,2006,3,0.99,117,29.99,PG,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,20050727,Wednesday,27,Weekday,July,3,2005,20050728,Thursday,28,Weekday,July,3,2005,0.99
10706,12670,580,ROSS,GREY,ROSS.GREY@sakilacustomer.org,1,374,GRAFFITI LOVE,2006,3,0.99,117,29.99,PG,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,20050818,Thursday,18,Weekday,August,3,2005,20050823,Tuesday,23,Weekday,August,3,2005,2.99


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rentals_silver

col_name,data_type,comment
fact_rental_key,bigint,
rental_key,bigint,
customer_key,bigint,
customer_first_name,string,
customer_last_name,string,
customer_email,string,
cutomer_active_status,int,
film_key,bigint,
film_title,string,
film_release_year,int,


##### Gold Table: Perform Aggregations
Create a new Gold table using the CTAS approach.

**Calculate the total amount spent by each customer by month.**

This is valuable for business owners to identify the customers that spent the most and how their spending varied monthly.

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_monthly_total_spent_by_customer_gold AS (
  SELECT customer_key AS CustomerID
    , customer_last_name AS LastName
    , customer_first_name AS FirstName
    , rental_month_name AS RentalMonth
    , SUM(payment_amount) AS TotalSpent
  FROM sakila_dlh.fact_rentals_silver
  GROUP BY CustomerID, LastName, FirstName, RentalMonth
  ORDER BY TotalSpent DESC);

SELECT * FROM sakila_dlh.fact_monthly_total_spent_by_customer_gold
LIMIT 20;

CustomerID,LastName,FirstName,RentalMonth,TotalSpent
148,HUNT,ELEANOR,July,100.77999999999996
470,ALLARD,GORDON,July,96.83
522,HAVENS,ARNOLD,July,96.81999999999998
137,KENNEDY,RHONDA,July,96.81
144,SHAW,CLARA,July,93.82
459,COLLAZO,TOMMY,July,89.82
257,DOUGLAS,MARSHA,July,88.82
148,HUNT,ELEANOR,August,87.82
295,BATES,DAISY,July,87.81999999999998
410,IRBY,CURTIS,August,86.83


**Calculate the number of rentals of each film each year.**

This is valuable for business owners to determine which films are the most popular in a given year so that they can restock similar films in the future.

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_yearly_rentals_by_film_gold AS (
  SELECT film_title AS FilmTitle
    , rental_year AS RentalYear
    , COUNT(film_key) AS RentalCount
  FROM sakila_dlh.fact_rentals_silver
  GROUP BY FilmTitle, RentalYear
  ORDER BY RentalCount DESC);

SELECT * FROM sakila_dlh.fact_yearly_rentals_by_film_gold
LIMIT 20;

FilmTitle,RentalYear,RentalCount
BUCKET BROTHERHOOD,2005,34
ROCKETEER MOTHER,2005,33
SCALAWAG DUCK,2005,32
FORWARD TEMPLE,2005,32
GRIT CLOCKWORK,2005,32
WIFE TURN,2005,31
RIDGEMONT SUBMARINE,2005,31
GOODFELLAS SALUTE,2005,31
JUGGLER HARDLY,2005,31
NETWORK PEAK,2005,31


**Calculate the count of how many films each customer has rented of each rating**

This is useful for business owners to know which rating of movies different customers typically rent.

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_film_rating_by_customer_gold AS (
  SELECT customer_key AS CustomerID
    , customer_last_name AS CustomerLastName
    , customer_first_name AS CustomerFirstName
    , film_rating AS FilmRating
    , COUNT(*) AS RentalCount
  FROM sakila_dlh.fact_rentals_silver
  GROUP BY CustomerID, CustomerLastName, CustomerFirstName, FilmRating
  ORDER BY CustomerLastName ASC);

SELECT * FROM sakila_dlh.fact_film_rating_by_customer_gold
LIMIT 20;

CustomerID,CustomerLastName,CustomerFirstName,FilmRating,RentalCount
505,ABNEY,RAFAEL,PG,5
505,ABNEY,RAFAEL,NC-17,4
505,ABNEY,RAFAEL,R,2
505,ABNEY,RAFAEL,G,3
505,ABNEY,RAFAEL,PG-13,7
504,ADAM,NATHANIEL,PG-13,4
504,ADAM,NATHANIEL,PG,3
504,ADAM,NATHANIEL,NC-17,6
504,ADAM,NATHANIEL,R,7
504,ADAM,NATHANIEL,G,8


#### Clean up the File System

In [0]:
%fs rm -r /FileStore/capstone_data/