## DS 2002 Final Project: Film Rental Information

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd 
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ds2002-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "nuq3tq",
  "password" : "Passw0rd123!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "devcluster.qlkwbnz"
atlas_database_name = "sakila_dw"
atlas_user_name = "nuq3tq"
atlas_password = "Passw0rd123!"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/final_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rentals_stream_dir = f"{stream_dir}"

rentals_output_bronze = f"{database_dir}/fact_rentals/bronze"
rentals_output_silver = f"{database_dir}/fact_rentals/silver"
rentals_output_gold   = f"{database_dir}/fact_rentals/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rentals", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Final Project Database"
LOCATION "dbfs:/FileStore/final_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql.mysql.database.azure.com:3306/sakila_dw", --Replace with your Server Name
  dbtable "dim_date",
  user "nuq3tq",    --Replace with your User Name
  password "Passw0rd123!"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/final_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,int,
day_name_of_week,varchar(10),
day_of_month,int,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/final_data/data/batch'

path,name,size,modificationTime
dbfs:/FileStore/final_data/data/batch/Sakila_DimCustomers.csv,Sakila_DimCustomers.csv,33834,1714432107000
dbfs:/FileStore/final_data/data/batch/Sakila_DimFilms.json,Sakila_DimFilms.json,441424,1714432107000
dbfs:/FileStore/final_data/data/batch/Sakila_DimInventories.json,Sakila_DimInventories.json,91294,1714432107000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/final_data/data/batch'
json_files = {"inventories" : 'Sakila_DimInventories.json'
              , "films" : 'Sakila_DimFilms.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f4878768180>

##### 2.3.1. Fetch Inventories Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "nuq3tq"
val pwd = "Passw0rd123!"
val clusterName = "devcluster.qlkwbnz"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_inventories = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_dw")
.option("collection", "inventories").load()
.select("inventory_key","inventory_id","film_id","store_id")

display(df_inventories)

inventory_key,inventory_id,film_id,store_id
1,1,1,1
2,2,1,1
3,3,1,1
4,4,1,1
5,5,1,2
6,6,1,2
7,7,1,2
8,8,1,2
9,9,2,2
10,10,2,2


In [0]:
%scala
df_inventories.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Inventories Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_inventories.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_inventories")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventories

col_name,data_type,comment
inventory_key,int,
inventory_id,int,
film_id,int,
store_id,int,
,,
# Delta Statistics Columns,,
Column Names,"inventory_key, inventory_id, film_id, store_id",
Column Selection Method,first-32,
,,
# Detailed Table Information,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventories LIMIT 5

inventory_key,inventory_id,film_id,store_id
1,1,1,1
2,2,1,1
3,3,1,1
4,4,1,1
5,5,1,2


##### 2.4.1 Fetch Films Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_films = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw")
.option("collection", "films")
.option("uri", atlas_uri).load()
.select("film_key", "film_id", "title", "description", "release_year", "rental_duration", "rental_rate", "length", "replacement_cost", "rating", "special_features")

display(df_films)

film_key,film_id,title,description,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features
1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"
3,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes"
4,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes"
5,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,6,2.99,130,22.99,G,Deleted Scenes
6,6,AGENT TRUMAN,A Intrepid Panorama of a Robot And a Boy who must Escape a Sumo Wrestler in Ancient China,2006,3,2.99,169,17.99,PG,Deleted Scenes
7,7,AIRPLANE SIERRA,A Touching Saga of a Hunter And a Butler who must Discover a Butler in A Jet Boat,2006,6,4.99,62,28.99,PG-13,"Trailers,Deleted Scenes"
8,8,AIRPORT POLLOCK,A Epic Tale of a Moose And a Girl who must Confront a Monkey in Ancient India,2006,6,4.99,54,15.99,R,Trailers
9,9,ALABAMA DEVIL,A Thoughtful Panorama of a Database Administrator And a Mad Scientist who must Outgun a Mad Scientist in A Jet Boat,2006,3,2.99,114,21.99,PG-13,"Trailers,Deleted Scenes"
10,10,ALADDIN CALENDAR,A Action-Packed Tale of a Man And a Lumberjack who must Reach a Feminist in Ancient China,2006,6,4.99,63,24.99,NC-17,"Trailers,Deleted Scenes"


In [0]:
%scala
df_films.printSchema()

##### 2.4.2. Use the Spark DataFrame to Create a New Films Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala

df_films.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_films")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_films

col_name,data_type,comment
film_key,int,
film_id,int,
title,string,
description,string,
release_year,int,
rental_duration,int,
rental_rate,double,
length,int,
replacement_cost,double,
rating,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_films LIMIT 5

film_key,film_id,title,description,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features
1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"
3,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes"
4,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes"
5,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,6,2.99,130,22.99,G,Deleted Scenes


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
customer_csv = f"{batch_dir}/Sakila_DimCustomers.csv"

df_customer = spark.read.format('csv').options(header='true', inferSchema='true').load(customer_csv)
display(df_customer)

customer_key,customer_id,store_id,first_name,last_name,email
1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org
2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org
3,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org
4,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org
5,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org
6,6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org
7,7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org
8,8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org
9,9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org
10,10,1,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org


In [0]:
df_customer.printSchema()

root
 |-- customer_key: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)



In [0]:
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
customer_key,int,
customer_id,int,
store_id,int,
first_name,string,
last_name,string,
email,string,
,,
# Delta Statistics Columns,,
Column Names,"first_name, customer_id, email, store_id, last_name, customer_key",
Column Selection Method,first-32,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5;

customer_key,customer_id,store_id,first_name,last_name,email
1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org
2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org
3,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org
4,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org
5,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_films,False
sakila_dlh,dim_inventories,False
,display_query_1,True
,display_query_2,True
,display_query_3,True
,display_query_4,True
,fact_rentals_silver_tempview,True
,rentals_bronze_tempview,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Rentals Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rentals_stream_dir)
 .createOrReplaceTempView("rentals_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

customer_key,fact_rental_key,film_key,inventory_key,last_update_key,rental_date_key,rental_id,return_date_key,_rescued_data,receipt_time,source_file
157,142,28,140,20060215,20050528,642,20050601,,2024-04-30T00:31:22.991Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals03.json
452,143,206,933,20060215,20050529,726,20050605,,2024-04-30T00:31:22.991Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals03.json
198,144,158,719,20060215,20050530,932,20050531,,2024-04-30T00:31:22.991Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals03.json
152,145,127,581,20060215,20050529,745,20050601,,2024-04-30T00:31:22.991Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals03.json
134,146,79,360,20060215,20050527,366,20050604,,2024-04-30T00:31:22.991Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals03.json
508,147,209,944,20060215,20050527,369,20050601,,2024-04-30T00:31:22.991Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals03.json
306,148,152,697,20060215,20050528,672,20050606,,2024-04-30T00:31:22.991Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals03.json
584,149,100,452,20060215,20050528,626,20050601,,2024-04-30T00:31:22.991Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals03.json
298,150,213,955,20060215,20050527,383,20050603,,2024-04-30T00:31:22.991Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals03.json
28,151,96,434,20060215,20050527,388,20050530,,2024-04-30T00:31:22.991Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals03.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f48604f62c0>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

customer_key,fact_rental_key,film_key,inventory_key,last_update_key,rental_date_key,rental_id,return_date_key,_rescued_data,receipt_time,source_file
130,1,80,367,20060215,20050524,1,20050526,,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals01.json
269,2,159,730,20060215,20050529,750,20050530,,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals01.json
239,3,158,723,20060215,20050527,444,20050601,,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals01.json
399,4,129,592,20060215,20050529,694,20050605,,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals01.json
261,5,4,20,20060215,20050527,465,20050602,,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals01.json
446,6,28,141,20060215,20050527,355,20050601,,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals01.json
319,7,78,354,20060215,20050527,439,20050602,,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals01.json
316,8,86,389,20060215,20050525,16,20050526,,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals01.json
575,9,181,830,20060215,20050525,17,20050527,,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals01.json
575,10,164,752,20060215,20050527,395,20050531,,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals01.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
customer_key,bigint,
fact_rental_key,bigint,
film_key,bigint,
inventory_key,bigint,
last_update_key,bigint,
rental_date_key,bigint,
rental_id,bigint,
return_date_key,bigint,
_rescued_data,string,
receipt_time,timestamp,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT 
      r.fact_rental_key,
      r.rental_id,
      r.rental_date_key,
      rd.full_date AS rental_date,
      r.return_date_key,
      rd2.full_date AS return_date,
      r.customer_key,
      c.customer_id,
      c.first_name AS customer_first_name,
      c.last_name AS customer_last_name,
      f.film_key,
      f.film_id,
      f.title AS film_title,
      f.description AS film_description,
      f.release_year AS film_release_year,
      f.rental_duration AS film_rental_duration,
      f.rental_rate AS film_rental_rate,
      f.length AS film_length,
      f.replacement_cost AS film_replacement_cost,
      f.rating AS film_rating,
      f.special_features AS film_special_features,
      i.inventory_key,
      i.inventory_id,
      d.date_key AS last_update_key,
      d.full_date AS last_update_date,
      r.receipt_time,
      r.source_file
  FROM rentals_silver_tempview AS r
  INNER JOIN sakila_dlh.dim_customer AS c
  ON r.customer_key = c.customer_key
  INNER JOIN sakila_dlh.dim_films AS f
  ON r.film_key = f.film_key
  INNER JOIN sakila_dlh.dim_inventories AS i
  ON r.inventory_key = i.inventory_key
  INNER JOIN sakila_dlh.dim_date AS rd
  ON r.rental_date_key = rd.date_key
  INNER JOIN sakila_dlh.dim_date AS rd2
  ON r.return_date_key = rd2.date_key
  INNER JOIN sakila_dlh.dim_date AS d
  ON r.last_update_key = d.date_key
)

In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f48604f5030>

In [0]:
%sql
SELECT * FROM fact_rentals_silver

fact_rental_key,rental_id,rental_date_key,rental_date,return_date_key,return_date,customer_key,customer_id,customer_first_name,customer_last_name,film_key,film_id,film_title,film_description,film_release_year,film_rental_duration,film_rental_rate,film_length,film_replacement_cost,film_rating,film_special_features,inventory_key,inventory_id,last_update_key,last_update_date,receipt_time,source_file
33,577,20050528,2005-05-28,20050601,2005-06-01,6,6,JENNIFER,DAVIS,83,83,BLUES INSTINCT,A Insightful Documentary of a Boat And a Composer who must Meet a Forensic Psychologist in An Abandoned Fun House,2006,5,2.99,50,18.99,G,"Trailers,Deleted Scenes,Behind the Scenes",375,375,20060215,2006-02-15,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals01.json
141,877,20050530,2005-05-30,20050602,2005-06-02,9,9,MARGARET,MOORE,196,196,CRUELTY UNFORGIVEN,A Brilliant Tale of a Car And a Moose who must Battle a Dentist in Nigeria,2006,7,0.99,69,29.99,G,"Deleted Scenes,Behind the Scenes",886,886,20060215,2006-02-15,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals02.json
212,987,20050530,2005-05-30,20050607,2005-06-07,11,11,LISA,ANDERSON,86,86,BOOGIE AMELIE,A Lacklusture Character Study of a Husband And a Sumo Wrestler who must Succumb a Technical Writer in The Gulf of Mexico,2006,6,4.99,121,11.99,R,"Commentaries,Behind the Scenes",390,390,20060215,2006-02-15,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals03.json
29,692,20050529,2005-05-29,20050602,2005-06-02,18,18,CAROL,GARCIA,174,174,CONFIDENTIAL INTERVIEW,A Stunning Reflection of a Cat And a Woman who must Find a Astronaut in Ancient Japan,2006,6,4.99,180,13.99,NC-17,Commentaries,800,800,20060215,2006-02-15,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals01.json
12,591,20050528,2005-05-28,20050529,2005-05-29,19,19,RUTH,MARTINEZ,83,83,BLUES INSTINCT,A Insightful Documentary of a Boat And a Composer who must Meet a Forensic Psychologist in An Abandoned Fun House,2006,5,2.99,50,18.99,G,"Trailers,Deleted Scenes,Behind the Scenes",377,377,20060215,2006-02-15,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals01.json
102,546,20050528,2005-05-28,20050601,2005-06-01,20,20,SHARON,ROBINSON,109,109,BUTTERFLY CHOCOLAT,A Fateful Story of a Girl And a Composer who must Conquer a Husband in A Shark Tank,2006,3,0.99,89,17.99,G,Behind the Scenes,487,487,20060215,2006-02-15,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals02.json
120,463,20050527,2005-05-27,20050604,2005-06-04,21,21,MICHELLE,CLARK,181,181,CONTACT ANONYMOUS,A Insightful Display of a A Shark And a Monkey who must Face a Database Administrator in Ancient India,2006,7,2.99,166,10.99,PG-13,Commentaries,826,826,20060215,2006-02-15,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals02.json
151,388,20050527,2005-05-27,20050530,2005-05-30,28,28,CYNTHIA,YOUNG,96,96,BREAKING HOME,A Beautiful Display of a Secret Agent And a Monkey who must Battle a Sumo Wrestler in An Abandoned Mine Shaft,2006,4,2.99,169,21.99,PG-13,"Trailers,Commentaries",434,434,20060215,2006-02-15,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals03.json
140,716,20050529,2005-05-29,20050601,2005-06-01,36,36,KATHLEEN,ADAMS,17,17,ALONE TRIP,A Fast-Paced Character Study of a Composer And a Dog who must Outgun a Boat in An Abandoned Fun House,2006,3,0.99,82,14.99,R,"Trailers,Behind the Scenes",85,85,20060215,2006-02-15,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals02.json
21,99,20050525,2005-05-25,20050528,2005-05-28,44,44,MARIE,TURNER,117,117,CANDLES GRAPES,A Fanciful Character Study of a Monkey And a Explorer who must Build a Astronaut in An Abandoned Fun House,2006,6,4.99,135,15.99,NC-17,"Trailers,Deleted Scenes",535,535,20060215,2006-02-15,2024-04-30T00:32:05.53Z,dbfs:/FileStore/final_data/data/stream/Sakila_FactRentals01.json


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rentals_silver

col_name,data_type,comment
fact_rental_key,bigint,
rental_id,bigint,
rental_date_key,bigint,
rental_date,date,
return_date_key,bigint,
return_date,date,
customer_key,bigint,
customer_id,int,
customer_first_name,string,
customer_last_name,string,


##### 6.3. Gold Table: Perform Aggregations

 This query will create a gold table that aggregates rental information by customer, including the rental date.

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_rentals_by_customer_gold AS (
  SELECT 
      customer_key AS CustomerID,
      customer_last_name AS LastName,
      customer_first_name AS FirstName,
      rental_date AS RentalDate,
      COUNT(rental_id) AS RentalCount
  FROM sakila_dlh.fact_rentals_silver
  GROUP BY CustomerID, LastName, FirstName, RentalDate
  ORDER BY RentalCount DESC
);

SELECT * FROM sakila_dlh.fact_rentals_by_customer_gold;


CustomerID,LastName,FirstName,RentalDate,RentalCount
368,ARCE,HARRY,2005-05-25,2
557,GAFFNEY,FELIX,2005-05-27,2
77,BENNETT,JANE,2005-05-28,2
44,TURNER,MARIE,2005-05-25,2
551,BARBEE,CLAYTON,2005-05-29,2
47,PARKER,FRANCES,2005-05-26,2
287,MILES,BECKY,2005-05-28,2
79,BARNES,RACHEL,2005-05-30,1
365,SCHWARZ,BRUCE,2005-05-25,1
297,RHODES,SHERRI,2005-05-25,1


This query will create another gold table that displays the most rented films along with their description, release year, rental rate, and the number of times each film has been rented.

In [0]:
%sql

CREATE OR REPLACE TABLE sakila_dlh.fact_rentals_by_film_gold AS (
  SELECT 
    film_title AS FilmTitle,
    film_description AS Description,
    film_release_year AS ReleaseYear,
    film_rental_rate AS RentalRate,
    COUNT(rental_id) AS RentalCount
  FROM 
      sakila_dlh.fact_rentals_silver
  GROUP BY 
      FilmTitle, Description, ReleaseYear, RentalRate
  ORDER BY 
      RentalCount DESC
);

SELECT * FROM sakila_dlh.fact_rentals_by_film_gold;


FilmTitle,Description,ReleaseYear,RentalRate,RentalCount
BOOGIE AMELIE,A Lacklusture Character Study of a Husband And a Sumo Wrestler who must Succumb a Technical Writer in The Gulf of Mexico,2006,4.99,4
CENTER DINOSAUR,A Beautiful Character Study of a Sumo Wrestler And a Dentist who must Find a Dog in California,2006,4.99,3
ANTHEM LUKE,A Touching Panorama of a Waitress And a Woman who must Outrace a Dog in An Abandoned Amusement Park,2006,4.99,3
CONFIDENTIAL INTERVIEW,A Stunning Reflection of a Cat And a Woman who must Find a Astronaut in Ancient Japan,2006,4.99,3
CANDLES GRAPES,A Fanciful Character Study of a Monkey And a Explorer who must Build a Astronaut in An Abandoned Fun House,2006,4.99,3
BARBARELLA STREETCAR,A Awe-Inspiring Story of a Feminist And a Cat who must Conquer a Dog in A Monastery,2006,2.99,3
BUTTERFLY CHOCOLAT,A Fateful Story of a Girl And a Composer who must Conquer a Husband in A Shark Tank,2006,0.99,3
CELEBRITY HORN,A Amazing Documentary of a Secret Agent And a Astronaut who must Vanquish a Hunter in A Shark Tank,2006,0.99,3
COAST RAINBOW,A Astounding Documentary of a Mad Cow And a Pioneer who must Challenge a Butler in The Sahara Desert,2006,0.99,3
BUCKET BROTHERHOOD,A Amazing Display of a Girl And a Womanizer who must Succumb a Lumberjack in A Baloon Factory,2006,4.99,3


#### Clean up the File System

In [0]:
%fs rm -r /FileStore/final_data/