## DS2002 Capstone - Eva Massarelli

**Deliverable:** Design and populate a dimensional (star schema) data mart that represents a simple
business process of your choosing. Examples might include retail sales, procurement, order
management, transportation or hospitality bookings, medical appointments, student registration and/or
attendance. You may select any business process that interests you, but remember that a dimensional
data mart provides for the post hoc summarization and historic analysis of business transactions
(represented by a Fact table) that reflect the interaction between various business entities (e.g., patients
& doctors, retailers & customers, travelers & airlines/hotels) as represented by dimension tables.

Your project should demonstrate your ability to implement the Data Lakehouse architecture using
Databricks Spark Structured Streaming & Delta Tables technologies. It should demonstrate your
understanding of the differing types of data systems (e.g., Relational (aka SQL), NoSQL, File Systems,
API’s), and how data (structured, semi-structured, unstructured) can be extracted from those source
systems, transformed (cleansed, integrated), and then loaded into (or exposed through) a destination
system that’s optimized for post hoc diagnostic analysis. Your project should also demonstrate your
knowledge of data integration design patterns like ETL, ELT and ELTL, and architectures (e.g., lambda or
kappa) for integrating batch and real-time (streaming) data sources.

**Benchmarks:**
1. Your solution must demonstrate accumulating data that originates from a real-time (streaming)
data source for a predetermined interval (mini-batch), integrating it with reference data, and
then using the product as a source for populating some aspect of your dimensional data mart.
(i.e., implement something like the Databricks bronze, silver, gold architecture).
    - a. Your solution must demonstrate the integration of streaming data for at least 3 intervals.
This behavior can be mimicked by exporting transaction (fact table) data into a collection
of data files (e.g., JSON, CSV) that represent a sequence of rows from that table.
    - b. Your data visualization(s) need NOT reflect the integration of data in real-time.
2. You must submit all reference data used to populate the source databases, JSON/CSV files, etc.
3. You must submit all SQL code, including any data definition and data manipulation statements.
4. You must submit all Python code needed to implement data integration, and any object creation.
5. You must submit either all data visualization source files (e.g., Excel, Power BI workbook).
– OR –
6. Submit screen-grabs of your finished data visualization(s)


### Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ecm8yu-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw2"

connection_properties = {
  "user" : "ecm8yu",
  "password" : "Passw0rd123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "sandbox.den9rfu"
atlas_database_name = "sakila_dw2"
atlas_user_name = "ecm8yu"
atlas_password = "HR91sK8EWrAUzWRM"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/MassarelliEva-Capstone"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rentals_stream_dir = f"{stream_dir}/rentals"

rentals_output_bronze = f"{database_dir}/fact_rentals/bronze"
rentals_output_silver = f"{database_dir}/fact_rentals/silver"
rentals_output_gold   = f"{database_dir}/fact_rentals/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rentals", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[10]: True

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data (From Relational Database Source)
#### 1.0. Create New Databricks Metadata Database from Fetching Reference Data From an Azure MySQL Database
##### 1.1. Integrating Date Dimension.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Capstone Database"
LOCATION "dbfs:/FileStore/MassarelliEva-Capstone/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ecm8yu-mysql.mysql.database.azure.com:3306/sakila_dw2",
  dbtable "dim_date",
  user "ecm8yu",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/MassarelliEva-Capstone/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20010101,2001-01-01,2001/01/01,01/01/2001,01/01/2001,2,Monday,1,1,Weekday,1,January,1,N,1,2001,2001-01,2001Q1,7,3,2001,2001-07,2001Q3
20010102,2001-01-02,2001/01/02,01/02/2001,02/01/2001,3,Tuesday,2,2,Weekday,1,January,1,N,1,2001,2001-01,2001Q1,7,3,2001,2001-07,2001Q3
20010103,2001-01-03,2001/01/03,01/03/2001,03/01/2001,4,Wednesday,3,3,Weekday,1,January,1,N,1,2001,2001-01,2001Q1,7,3,2001,2001-07,2001Q3
20010104,2001-01-04,2001/01/04,01/04/2001,04/01/2001,5,Thursday,4,4,Weekday,1,January,1,N,1,2001,2001-01,2001Q1,7,3,2001,2001-07,2001Q3
20010105,2001-01-05,2001/01/05,01/05/2001,05/01/2001,6,Friday,5,5,Weekday,1,January,1,N,1,2001,2001-01,2001Q1,7,3,2001,2001-07,2001Q3


##### 1.3. Create a New Table that Sources Films Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Creates a Temporary View named "view_films" that extracts data from my MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_films
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ecm8yu-mysql.mysql.database.azure.com:3306/sakila_dw2",
  dbtable "dim_films",
  user "ecm8yu",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

-- Creates a new table named "sakila_dlh.dim_films" using data from the view named "view_films"
CREATE OR REPLACE TABLE sakila_dlh.dim_films
COMMENT "Films Dimension Table"
LOCATION "dbfs:/FileStore/MassarelliEva-Capstone/sakila_dlh/dim_films"
AS SELECT * FROM view_films

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_films;

col_name,data_type,comment
film_key,bigint,
title,string,
description,string,
release_year,bigint,
language_id,bigint,
original_language_id,string,
rental_duration,bigint,
rental_rate,double,
length,bigint,
replacement_cost,double,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_films LIMIT 5

film_key,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,1,,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes"
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,1,,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes"
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,1,,6,2.99,130,22.99,G,Deleted Scenes


##### 1.4. Create a New Table that Sources Rental Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Creates a Temporary View named "view_rentals" that extracts data from my MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_rentals
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ecm8yu-mysql.mysql.database.azure.com:3306/sakila_dw2",
  dbtable "dim_rentals",
  user "ecm8yu",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

-- Creates a new table named "sakila_dlh.dim_rentals" using data from the view named "view_rentals"
CREATE OR REPLACE TABLE sakila_dlh.dim_rentals
COMMENT "Rentals Dimension Table"
LOCATION "dbfs:/FileStore/MassarelliEva-Capstone/sakila_dlh/dim_rentals"
AS SELECT * FROM view_rentals

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_rentals;

col_name,data_type,comment
rental_key,bigint,
rental_date,string,
inventory_id,bigint,
customer_id,bigint,
return_date,string,
staff_id,bigint,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_rentals LIMIT 5

rental_key,rental_date,inventory_id,customer_id,return_date,staff_id
1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1
2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1
3,2005-05-24 23:03:39,1711,408,2005-06-01 22:12:39,1
4,2005-05-24 23:04:41,2452,333,2005-06-03 01:43:41,2
5,2005-05-24 23:05:21,2079,222,2005-06-02 04:33:21,1


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database (NoSQL Database Source)
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/MassarelliEva-Capstone/source_data/batch/Sakila_DimCustomers.json,Sakila_DimCustomers.json,111654,1682984154000
dbfs:/FileStore/MassarelliEva-Capstone/source_data/batch/Sakila_DimFilms.json,Sakila_DimFilms.json,422531,1682984154000
dbfs:/FileStore/MassarelliEva-Capstone/source_data/batch/Sakila_DimInventory.csv,Sakila_DimInventory.csv,9429,1682984154000
dbfs:/FileStore/MassarelliEva-Capstone/source_data/batch/Sakila_DimPayments.csv,Sakila_DimPayments.csv,30010,1682984155000
dbfs:/FileStore/MassarelliEva-Capstone/source_data/batch/Sakila_DimStaff.json,Sakila_DimStaff.json,478,1682984155000
dbfs:/FileStore/MassarelliEva-Capstone/source_data/batch/Sakila_DimStores.csv,Sakila_DimStores.csv,50,1682984155000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/MassarelliEva-Capstone/source_data/batch'
json_files = {"customers" : 'Sakila_DimCustomers.json', "staff" : 'Sakila_DimStaff.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[27]: <pymongo.results.InsertManyResult at 0x7f65100c3e80>

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw2").option("collection", "customers").load()
.select("customer_key","store_id","first_name","last_name","email","address_id","active")

display(df_customer.limit(10))

customer_key,store_id,first_name,last_name,email,address_id,active
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1
6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,10,1
7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,11,1
8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,12,1
9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,13,1
10,1,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,14,1


In [0]:
%scala
df_customer.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer

col_name,data_type,comment
customer_key,int,
store_id,int,
first_name,string,
last_name,string,
email,string,
address_id,int,
active,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_key,store_id,first_name,last_name,email,address_id,active
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1


##### 2.4.1 Fetch Staff Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_staff = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw2").option("collection", "staff").load()
.select("staff_key","first_name","last_name","address_id","email","store_id","active","username","password")

display(df_staff.limit(10))

staff_key,first_name,last_name,address_id,email,store_id,active,username,password
1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964
2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,


In [0]:
%scala
df_staff.printSchema()

##### 2.4.2. Use the Spark DataFrame to Create a New Staff Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff

col_name,data_type,comment
staff_key,int,
first_name,string,
last_name,string,
address_id,int,
email,string,
store_id,int,
active,int,
username,string,
password,string,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 2

staff_key,first_name,last_name,address_id,email,store_id,active,username,password
1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964
2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,


#### 3.0. Fetch Data from a File System (Databricks File System (DBFS))
##### 3.1. Use PySpark to Read Store Dimension Data from CSV File

In [0]:
stores_csv = f"{batch_dir}/Sakila_DimStores.csv"

df_stores = spark.read.format('csv').options(header='true', inferSchema='true').load(stores_csv)
display(df_stores)

store_key,manager_staff_id,address_id
1,1,1
2,2,2


In [0]:
df_stores.printSchema()

root
 |-- store_key: integer (nullable = true)
 |-- manager_staff_id: integer (nullable = true)
 |-- address_id: integer (nullable = true)



In [0]:
df_stores.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_stores")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_stores;

col_name,data_type,comment
store_key,int,
manager_staff_id,int,
address_id,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_stores,
Type,MANAGED,
Location,dbfs:/FileStore/MassarelliEva-Capstone/sakila_dlh/dim_stores,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_stores LIMIT 2;

store_key,manager_staff_id,address_id
1,1,1
2,2,2


##### 3.2 Use PySpark to Read Inventory Dimension Data from CSV File

In [0]:
inventory_csv = f"{batch_dir}/Sakila_DimInventory.csv"

df_inventory = spark.read.format('csv').options(header='true', inferSchema='true').load(inventory_csv)
display(df_inventory.limit(10))

inventory_key,film_id,store_id
1,1,1
2,1,1
3,1,1
4,1,1
5,1,2
6,1,2
7,1,2
8,1,2
9,2,2
10,2,2


In [0]:
df_inventory.printSchema()

root
 |-- inventory_key: integer (nullable = true)
 |-- film_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)



In [0]:
df_inventory.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_inventory")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory;

col_name,data_type,comment
inventory_key,int,
film_id,int,
store_id,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_inventory,
Type,MANAGED,
Location,dbfs:/FileStore/MassarelliEva-Capstone/sakila_dlh/dim_inventory,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5;

inventory_key,film_id,store_id
1,1,1
2,1,1
3,1,1
4,1,1
5,1,2


##### 3.3 Use PySpark to Read Payment Dimension Data from CSV File

In [0]:
payments_csv = f"{batch_dir}/Sakila_DimPayments.csv"

df_payments = spark.read.format('csv').options(header='true', inferSchema='true').load(payments_csv)
display(df_payments.limit(10))

payment_key,customer_id,staff_id,rental_id,amount,payment_date
1,1,1,76,2.99,2005-08-23
2,1,1,573,0.99,2005-08-23
3,1,1,1185,5.99,2005-08-23
4,1,2,1422,0.99,2005-08-23
5,1,2,1476,9.99,2005-08-23
6,1,1,1725,4.99,2005-08-23
7,1,1,2308,4.99,2005-08-23
8,1,2,2363,0.99,2005-08-23
9,1,1,3284,3.99,2005-08-23
10,1,2,4526,5.99,2005-08-23


In [0]:
df_payments.printSchema()

root
 |-- payment_key: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: date (nullable = true)



In [0]:
df_payments.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_payments")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_payments;

col_name,data_type,comment
payment_key,int,
customer_id,int,
staff_id,int,
rental_id,int,
amount,double,
payment_date,date,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_payments LIMIT 5;

payment_key,customer_id,staff_id,rental_id,amount,payment_date
1,1,1,76,2.99,2005-08-23
2,1,1,573,0.99,2005-08-23
3,1,1,1185,5.99,2005-08-23
4,1,2,1422,0.99,2005-08-23
5,1,2,1476,9.99,2005-08-23


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_films,False
sakila_dlh,dim_inventory,False
sakila_dlh,dim_payments,False
sakila_dlh,dim_rentals,False
sakila_dlh,dim_staff,False
sakila_dlh,dim_stores,False
,view_date,True
,view_films,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Rentals Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

Go to mysql workbench, right click table and click create table to get this info

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 .option("cloudFiles.schemaHints", "rental_key BIGINT")
 .option("cloudFiles.schemaHints", "store_key BIGINT")
 .option("cloudFiles.schemaHints", "film_key BIGINT") 
 .option("cloudFiles.schemaHints", "customer_key BIGINT")
 .option("cloudFiles.schemaHints", "staff_key BIGINT")
 .option("cloudFiles.schemaHints", "rental_date_key DECIMAL")
 .option("cloudFiles.schemaHints", "return_date_key DECIMAL")
 .option("cloudFiles.schemaHints", "payment_date_key DECIMAL") 
 .option("cloudFiles.schemaHints", "amount DOUBLE")
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rentals_stream_dir)
 .createOrReplaceTempView("rentals_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview LIMIT 5;

amount,customer_key,fact_order_key,film_key,payment_date_key,rental_date_key,rental_key,return_date_key,staff_key,store_key,_rescued_data,receipt_time,source_file
0.99,19,501,875,20050823,20050823,9256,20050830,1,2,,2023-05-02T00:02:52.667+0000,dbfs:/FileStore/MassarelliEva-Capstone/source_data/stream/rentals/Sakila_DimFactOrders03.json
9.99,19,502,506,20050823,20050823,10077,20050830,1,2,,2023-05-02T00:02:52.667+0000,dbfs:/FileStore/MassarelliEva-Capstone/source_data/stream/rentals/Sakila_DimFactOrders03.json
7.99,19,503,263,20050823,20050823,10176,20050830,2,1,,2023-05-02T00:02:52.667+0000,dbfs:/FileStore/MassarelliEva-Capstone/source_data/stream/rentals/Sakila_DimFactOrders03.json
8.99,19,504,590,20050823,20050823,11508,20050830,2,1,,2023-05-02T00:02:52.667+0000,dbfs:/FileStore/MassarelliEva-Capstone/source_data/stream/rentals/Sakila_DimFactOrders03.json
5.99,19,505,757,20050823,20050823,11869,20050830,1,1,,2023-05-02T00:02:52.667+0000,dbfs:/FileStore/MassarelliEva-Capstone/source_data/stream/rentals/Sakila_DimFactOrders03.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

Out[63]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f6508317730>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview LIMIT 5;

amount,customer_key,fact_order_key,film_key,payment_date_key,rental_date_key,rental_key,return_date_key,staff_key,store_key,_rescued_data,receipt_time,source_file
2.99,1,1,663,20050823,20050823,76,20050830,2,2,,2023-05-01T23:56:58.275+0000,dbfs:/FileStore/MassarelliEva-Capstone/source_data/stream/rentals/Sakila_DimFactOrders01.json
0.99,1,2,875,20050823,20050823,573,20050830,1,2,,2023-05-01T23:56:58.275+0000,dbfs:/FileStore/MassarelliEva-Capstone/source_data/stream/rentals/Sakila_DimFactOrders01.json
5.99,1,3,611,20050823,20050823,1185,20050830,2,1,,2023-05-01T23:56:58.275+0000,dbfs:/FileStore/MassarelliEva-Capstone/source_data/stream/rentals/Sakila_DimFactOrders01.json
0.99,1,4,228,20050823,20050823,1422,20050830,2,2,,2023-05-01T23:56:58.275+0000,dbfs:/FileStore/MassarelliEva-Capstone/source_data/stream/rentals/Sakila_DimFactOrders01.json
9.99,1,5,308,20050823,20050823,1476,20050830,1,1,,2023-05-01T23:56:58.275+0000,dbfs:/FileStore/MassarelliEva-Capstone/source_data/stream/rentals/Sakila_DimFactOrders01.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
amount,double,
customer_key,bigint,
fact_order_key,bigint,
film_key,bigint,
payment_date_key,bigint,
rental_date_key,bigint,
rental_key,bigint,
return_date_key,bigint,
staff_key,bigint,
store_key,bigint,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT r.fact_order_key,
      r.rental_key,
      re.inventory_id,
      r.store_key,
      st.manager_staff_id,
      r.film_key,
      f.title AS film_name,
      f.rating AS film_rating,
      f.rental_duration,
      f.replacement_cost,
      r.customer_key,
      c.first_name AS customer_first_name,
      c.last_name AS customer_last_name,
      c.email AS customer_email,
      c.address_id AS customer_address_id,
      r.staff_key,
      s.first_name AS staff_first_name,
      s.last_name AS staff_last_name,
      s.email AS staff_email,
      r.rental_date_key,
      rd.day_name_of_week AS rental_day_name_of_week,
      rd.day_of_month AS rental_day_of_month,
      rd.weekday_weekend AS rental_weekday_weekend,
      rd.month_name AS rental_month_name,
      rd.calendar_quarter AS rental_quarter,
      rd.calendar_year AS rental_year,
      r.return_date_key,
      red.day_name_of_week AS return_day_name_of_week,
      red.day_of_month AS return_day_of_month,
      red.weekday_weekend AS return_weekday_weekend,
      red.month_name AS return_month_name,
      red.calendar_quarter AS return_quarter,
      red.calendar_year AS return_year,
      r.payment_date_key,
      pd.day_name_of_week AS payment_day_name_of_week,
      pd.day_of_month AS payment_day_of_month,
      pd.weekday_weekend AS payment_weekday_weekend,
      pd.month_name AS payment_month_name,
      pd.calendar_quarter AS payment_quarter,
      pd.calendar_year AS payment_year,
      r.amount AS rental_price
  FROM rentals_silver_tempview AS r
  INNER JOIN sakila_dlh.dim_rentals AS re
  ON re.rental_key = r.rental_key
  INNER JOIN sakila_dlh.dim_stores AS st
  ON st.store_key = r.store_key
  INNER JOIN sakila_dlh.dim_films AS f
  ON f.film_key = r.film_key
  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_key = r.customer_key
  INNER JOIN sakila_dlh.dim_staff AS s
  ON s.staff_key = r.staff_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS rd
  ON rd.date_key = r.rental_date_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS red
  ON red.date_key = r.return_date_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS pd
  ON pd.date_key = r.payment_date_key
)

In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

Out[68]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f650831ca30>

In [0]:
%sql
SELECT * FROM fact_rentals_silver LIMIT 5;

fact_order_key,rental_key,inventory_id,store_key,manager_staff_id,film_key,film_name,film_rating,rental_duration,replacement_cost,customer_key,customer_first_name,customer_last_name,customer_email,customer_address_id,staff_key,staff_first_name,staff_last_name,staff_email,rental_date_key,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_quarter,rental_year,return_date_key,return_day_name_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_quarter,return_year,payment_date_key,payment_day_name_of_week,payment_day_of_month,payment_weekday_weekend,payment_month_name,payment_quarter,payment_year,rental_price
489,18,3376,1,1,741,ROMAN PUNK,NC-17,7,28.99,19,RUTH,MARTINEZ,RUTH.MARTINEZ@sakilacustomer.org,23,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,20050823,Tuesday,23,Weekday,August,3,2005,20050830,Tuesday,30,Weekday,August,3,2005,20050823,Tuesday,23,Weekday,August,3,2005,0.99
174,46,3318,1,1,730,RIDGEMONT SUBMARINE,PG-13,3,28.99,7,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,11,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,20050823,Tuesday,23,Weekday,August,3,2005,20050830,Tuesday,30,Weekday,August,3,2005,20050823,Tuesday,23,Weekday,August,3,2005,5.99
951,47,2211,2,2,478,JAWS HARRY,G,4,10.99,35,VIRGINIA,GREEN,VIRGINIA.GREEN@sakilacustomer.org,39,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,20050823,Tuesday,23,Weekday,August,3,2005,20050830,Tuesday,30,Weekday,August,3,2005,20050823,Tuesday,23,Weekday,August,3,2005,3.99
467,50,1983,2,2,432,HOPE TOOTSIE,NC-17,4,22.99,18,CAROL,GARCIA,CAROL.GARCIA@sakilacustomer.org,22,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,20050823,Tuesday,23,Weekday,August,3,2005,20050830,Tuesday,30,Weekday,August,3,2005,20050823,Tuesday,23,Weekday,August,3,2005,2.99
146,57,3938,2,2,858,SUBMARINE BED,R,5,21.99,6,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,10,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,20050823,Tuesday,23,Weekday,August,3,2005,20050830,Tuesday,30,Weekday,August,3,2005,20050823,Tuesday,23,Weekday,August,3,2005,4.99


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rentals_silver

col_name,data_type,comment
fact_order_key,bigint,
rental_key,bigint,
inventory_id,bigint,
store_key,bigint,
manager_staff_id,int,
film_key,bigint,
film_name,string,
film_rating,string,
rental_duration,bigint,
replacement_cost,double,


##### 6.3. Gold Table: Perform Aggregations (NOTE the required visualization is a tab on output)

In [0]:
%sql
SELECT customer_key AS CustomerID
  , customer_last_name AS LastName
  , customer_first_name AS FirstName
  , rental_month_name AS RentalMonth
  , COUNT(rental_key) AS RentalCount
  , SUM(rental_price) AS TotalSpent
FROM sakila_dlh.fact_rentals_silver
GROUP BY CustomerID, LastName, FirstName, RentalMonth
ORDER BY RentalCount DESC;

CustomerID,LastName,FirstName,RentalMonth,RentalCount,TotalSpent
5,BROWN,ELIZABETH,August,38,144.61999999999995
29,HERNANDEZ,ANGELA,August,36,140.63999999999996
21,CLARK,MICHELLE,August,35,155.65
30,KING,MELISSA,August,34,123.65999999999993
26,HALL,JESSICA,August,34,152.65999999999997
7,MILLER,MARIA,August,33,151.67
15,HARRIS,HELEN,August,32,134.68
35,GREEN,VIRGINIA,August,32,129.67999999999995
1,SMITH,MARY,August,32,118.67999999999992
28,YOUNG,CYNTHIA,August,32,111.67999999999996


Output can only be rendered in Databricks

#### 7.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/MassarelliEva-Capstone/