## DS 2002 Project 2

### Section I: Setting Up

#### Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "nhk9hb-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "nhk9hb",
  "password" : "Passw0rd123",
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.mnm8h"
atlas_database_name = "sakila_dw2"
atlas_user_name = "nhk9hb"
atlas_password = "B3ZgCYqqw3RFNFCq"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/project_data"
database_dir = f"{base_dir}/{dst_database}"

batch_dir = f"{base_dir}/batch"
stream_dir = f"{base_dir}/stream"

rentals_stream_dir = f"{stream_dir}/rentals"

rentals_output_bronze = f"{database_dir}/fact_rentals/bronze"
rentals_output_silver = f"{database_dir}/fact_rentals/silver"
rentals_output_gold   = f"{database_dir}/fact_rentals/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rentals", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section 2: Populate Dimensions by Ingesting Reference (Cold-path) Data 

#### Create a New Databricks Metadata Database

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Project 2 Database"
LOCATION "dbfs:/FileStore/project_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Project 2");

##### Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://nhk9hb-mysql.mysql.database.azure.com:3306/sakila_dw?useSSL=true&requireSSL=true",
  dbtable "dim_date",
  user "nhk9hb", 
  password "Passw0rd123" 
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/project_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### Create a New Table that Sources Staff Dimension Data from an Azure MySQL database

In [0]:
%sql
-- Creating temporary view
CREATE OR REPLACE TEMPORARY VIEW view_staff
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://nhk9hb-mysql.mysql.database.azure.com:3306/sakila_dw?useSSL=true&requireSSL=true",
  dbtable "dim_staff",
  user "nhk9hb", 
  password "Passw0rd123"
)

In [0]:
%sql
-- Creating new table

USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_staff
COMMENT "Staff Dimension Table"
LOCATION "dbfs:/FileStore/project_data/sakila_dlh/dim_staff"
AS SELECT * FROM view_staff

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff;

col_name,data_type,comment
staff_key,int,
staff_id,bigint,
first_name,varchar(65535),
last_name,varchar(65535),
email,varchar(65535),
active,bigint,
username,varchar(65535),
,,
# Delta Statistics Columns,,
Column Names,"first_name, email, username, staff_key, last_name, staff_id, active",


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 5

staff_key,staff_id,first_name,last_name,email,active,username
1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike
2,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,1,Jon


#### Fetch Reference Data from a MongoDB Atlas Database

##### View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/project_data/batch/sakila_customers.json,sakila_customers.json,211911,1733702583000
dbfs:/FileStore/project_data/batch/sakila_films.csv,sakila_films.csv,187293,1733702583000


##### Create a New MongoDB Database and Load JSON Data Into a New MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/project_data/batch'
json_files = {"customers" : 'sakila_customers.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7fa6898f3380>

##### Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "nhk9hb"
val pwd = "B3ZgCYqqw3RFNFCq"
val clusterName = "cluster0.mnm8h"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_customers = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_dw2")
.option("collection", "customers").load()
.select("customer_key","first_name","last_name","email","active","address","district","postal_code","phone","create_date_key","last_update_key")

display(df_customers)

customer_key,first_name,last_name,email,active,address,district,postal_code,phone,create_date_key,last_update_key
1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,1913 Hanoi Way,Nagasaki,35200,28303384290,20060214,20060215
2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,1121 Loja Avenue,California,17886,838635286649,20060214,20060215
3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,1,692 Joliet Street,Attika,83579,448477190408,20060214,20060215
4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,1,1566 Inegöl Manor,Mandalay,53561,705814003527,20060214,20060215
5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,1,53 Idfu Parkway,Nantou,42399,10655648674,20060214,20060215
6,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,1,1795 Santiago de Compostela Way,Texas,18743,860452626434,20060214,20060215
7,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,1,900 Santiago de Compostela Parkway,Central Serbia,93896,716571220373,20060214,20060215
8,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,1,478 Joliet Way,Hamilton,77948,657282285970,20060214,20060215
9,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,1,613 Korolev Drive,Masqat,45844,380657522649,20060214,20060215
10,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,1,1531 Salé Drive,Esfahan,53628,648856936185,20060214,20060215


In [0]:
%scala
df_customers.printSchema()

##### Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_customers.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer

col_name,data_type,comment
customer_key,int,
first_name,string,
last_name,string,
email,string,
active,int,
address,string,
district,string,
postal_code,string,
phone,string,
create_date_key,int,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_key,first_name,last_name,email,active,address,district,postal_code,phone,create_date_key,last_update_key
1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,1913 Hanoi Way,Nagasaki,35200,28303384290,20060214,20060215
2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,1121 Loja Avenue,California,17886,838635286649,20060214,20060215
3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,1,692 Joliet Street,Attika,83579,448477190408,20060214,20060215
4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,1,1566 Inegöl Manor,Mandalay,53561,705814003527,20060214,20060215
5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,1,53 Idfu Parkway,Nantou,42399,10655648674,20060214,20060215


#### Fetch Data from a File System, using PySpark to Read from a CSV

In [0]:
film_csv = f"{batch_dir}/sakila_films.csv"

df_film = spark.read.format('csv').options(header='true', inferSchema='true').load(film_csv)
display(df_film)

film_key,film_id,title,description,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update_key
1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",20060215
2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",20060215
3,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",20060215
4,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",20060215
5,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,6,2.99,130,22.99,G,Deleted Scenes,20060215
6,6,AGENT TRUMAN,A Intrepid Panorama of a Robot And a Boy who must Escape a Sumo Wrestler in Ancient China,2006,3,2.99,169,17.99,PG,Deleted Scenes,20060215
7,7,AIRPLANE SIERRA,A Touching Saga of a Hunter And a Butler who must Discover a Butler in A Jet Boat,2006,6,4.99,62,28.99,PG-13,"Trailers,Deleted Scenes",20060215
8,8,AIRPORT POLLOCK,A Epic Tale of a Moose And a Girl who must Confront a Monkey in Ancient India,2006,6,4.99,54,15.99,R,Trailers,20060215
9,9,ALABAMA DEVIL,A Thoughtful Panorama of a Database Administrator And a Mad Scientist who must Outgun a Mad Scientist in A Jet Boat,2006,3,2.99,114,21.99,PG-13,"Trailers,Deleted Scenes",20060215
10,10,ALADDIN CALENDAR,A Action-Packed Tale of a Man And a Lumberjack who must Reach a Feminist in Ancient China,2006,6,4.99,63,24.99,NC-17,"Trailers,Deleted Scenes",20060215


In [0]:
df_film.printSchema()

root
 |-- film_key: integer (nullable = true)
 |-- film_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- rental_duration: integer (nullable = true)
 |-- rental_rate: double (nullable = true)
 |-- length: integer (nullable = true)
 |-- replacement_cost: double (nullable = true)
 |-- rating: string (nullable = true)
 |-- special_features: string (nullable = true)
 |-- last_update_key: integer (nullable = true)



In [0]:
df_film.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_film")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film;

col_name,data_type,comment
film_key,int,
film_id,int,
title,string,
description,string,
release_year,int,
rental_duration,int,
rental_rate,double,
length,int,
replacement_cost,double,
rating,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5;

film_key,film_id,title,description,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update_key
1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",20060215
2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",20060215
3,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",20060215
4,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",20060215
5,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,6,2.99,130,22.99,G,Deleted Scenes,20060215


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_film,False
sakila_dlh,dim_staff,False
,_sqldf,True
,view_date,True
,view_staff,True


### Section 3: Integrate Reference Data with Real-Time Data

#### Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rentals_stream_dir)
 .createOrReplaceTempView("rentals_raw_tempview"))

In [0]:
%sql
-- Add Metadata for Traceability
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

customer_key,film_key,inventory_id,rental_date_key,rental_id,rental_key,return_date_key,staff_key,_rescued_data,receipt_time,source_file
332,385,1774,20050531,1000,1000,20050608,2,,2024-12-09T00:04:53.553Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
64,328,1498,20050531,1001,1001,20050606,2,,2024-12-09T00:04:53.553Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
397,155,709,20050531,1002,1002,20050606,1,,2024-12-09T00:04:53.553Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
161,26,133,20050531,1003,1003,20050602,2,,2024-12-09T00:04:53.553Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
565,348,1588,20050531,1004,1004,20050601,1,,2024-12-09T00:04:53.553Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
551,872,4006,20050531,1005,1005,20050604,2,,2024-12-09T00:04:53.553Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
222,759,3461,20050531,1006,1006,20050602,1,,2024-12-09T00:04:53.553Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
24,698,3185,20050531,1007,1007,20050607,2,,2024-12-09T00:04:53.553Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
599,203,914,20050531,1008,1008,20050601,2,,2024-12-09T00:04:53.553Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
485,554,2523,20050531,1009,1009,20050603,1,,2024-12-09T00:04:53.553Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7fa64cc7fe10>

##### Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

customer_key,film_key,inventory_id,rental_date_key,rental_id,rental_key,return_date_key,staff_key,_rescued_data,receipt_time,source_file
332,385,1774,20050531,1000,1000,20050608,2,,2024-12-09T00:05:14.25Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
64,328,1498,20050531,1001,1001,20050606,2,,2024-12-09T00:05:14.25Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
397,155,709,20050531,1002,1002,20050606,1,,2024-12-09T00:05:14.25Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
161,26,133,20050531,1003,1003,20050602,2,,2024-12-09T00:05:14.25Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
565,348,1588,20050531,1004,1004,20050601,1,,2024-12-09T00:05:14.25Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
551,872,4006,20050531,1005,1005,20050604,2,,2024-12-09T00:05:14.25Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
222,759,3461,20050531,1006,1006,20050602,1,,2024-12-09T00:05:14.25Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
24,698,3185,20050531,1007,1007,20050607,2,,2024-12-09T00:05:14.25Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
599,203,914,20050531,1008,1008,20050601,2,,2024-12-09T00:05:14.25Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json
485,554,2523,20050531,1009,1009,20050603,1,,2024-12-09T00:05:14.25Z,dbfs:/FileStore/project_data/stream/rentals/sakila_fact_rentals03.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
customer_key,bigint,
film_key,bigint,
inventory_id,bigint,
rental_date_key,bigint,
rental_id,bigint,
rental_key,bigint,
return_date_key,bigint,
staff_key,bigint,
_rescued_data,string,
receipt_time,timestamp,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT r.rental_key,
  r.inventory_id,
  --f.film_key,
  f.title AS film_title,
  f.description AS film_description,
  f.release_year AS film_release_year,
  f.rental_duration AS film_rental_duration,
  f.rental_rate AS film_rental_rate,
  f.length AS film_length,
  f.replacement_cost AS film_replacement_cost,
  f.rating AS film_rating,
  f.special_features AS film_special_features,
  --r.customer_key,
  c.first_name AS customer_first_name,
  c.last_name AS customer_last_name,
  c.email AS customer_email,
  c.active AS customer_active,
  c.address AS customer_address,
  c.district AS customer_district,
  c.postal_code AS customer_postal_code,
  c.phone AS customer_phone,
  --r.staff_key,
  s.first_name AS staff_first_name,
  s.last_name AS staff_last_name,
  s.email AS staff_email,
  s.active AS staff_active,
  s.username AS staff_username,
  --r.rental_date_key,
  rd.day_name_of_week AS rental_day_name_of_week,
  rd.day_of_month AS rental_day_of_month,
  rd.weekday_weekend AS rental_weekday_weekend,
  rd.month_name AS rental_month_name,
  rd.calendar_quarter AS rental_calendar_quarter,
  rd.calendar_year AS rental_calendar_year,
  --r.return_date_key,
  rr.day_name_of_week AS return_day_name_of_week,
  rr.day_of_month AS return_day_of_month,
  rr.weekday_weekend AS return_weekday_weekend,
  rr.month_name AS return_month_name,
  rr.calendar_quarter AS return_calendar_quarter,
  rr.calendar_year AS return_calendar_year
  FROM 
    rentals_silver_tempview AS r
  INNER JOIN sakila_dlh.dim_customer AS c
    ON c.customer_key = r.customer_key
  INNER JOIN sakila_dlh.dim_staff AS s
    ON s.staff_key = r.staff_key
  INNER JOIN sakila_dlh.dim_film AS f
    ON f.film_key = r.film_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS rd
    ON rd.date_key = r.rental_date_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS rr
    ON rr.date_key = r.return_date_key
);

In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7fa64cbf6090>

In [0]:
%sql
SELECT * FROM fact_rentals_silver

rental_key,inventory_id,film_title,film_description,film_release_year,film_rental_duration,film_rental_rate,film_length,film_replacement_cost,film_rating,film_special_features,customer_first_name,customer_last_name,customer_email,customer_active,customer_address,customer_district,customer_postal_code,customer_phone,staff_first_name,staff_last_name,staff_email,staff_active,staff_username,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_calendar_quarter,rental_calendar_year,return_day_name_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_calendar_quarter,return_calendar_year
573,4020,TALENTED HOMICIDE,A Lacklusture Panorama of a Dentist And a Forensic Psychologist who must Outrace a Pioneer in A U-Boat,2006,6,0.99,173,9.99,PG,"Commentaries,Deleted Scenes,Behind the Scenes",MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,1913 Hanoi Way,Nagasaki,35200,28303384290,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike,Saturday,28,Weekend,May,2,2005,Friday,3,Weekday,June,2,2005
320,1090,DOORS PRESIDENT,A Awe-Inspiring Display of a Squirrel And a Woman who must Overcome a Boy in The Gulf of Mexico,2006,3,4.99,49,22.99,NC-17,"Trailers,Commentaries,Deleted Scenes,Behind the Scenes",PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,1121 Loja Avenue,California,17886,838635286649,Jon,Stephens,Jon.Stephens@sakilastaff.com,1,Jon,Friday,27,Weekday,May,2,2005,Saturday,28,Weekend,May,2,2005
830,3464,SALUTE APOLLO,A Awe-Inspiring Character Study of a Boy And a Feminist who must Sink a Crocodile in Ancient China,2006,4,2.99,73,29.99,R,"Trailers,Commentaries,Deleted Scenes,Behind the Scenes",LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,1,692 Joliet Street,Attika,83579,448477190408,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike,Sunday,29,Weekend,May,2,2005,Wednesday,1,Weekday,June,2,2005
1735,97,AMADEUS HOLY,A Emotional Display of a Pioneer And a Technical Writer who must Battle a Man in A Baloon,2006,6,0.99,113,20.99,PG,"Commentaries,Deleted Scenes,Behind the Scenes",BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,1,1566 Inegöl Manor,Mandalay,53561,705814003527,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike,Thursday,16,Weekday,June,2,2005,Monday,20,Weekday,June,2,2005
731,4124,TOOTSIE PILOT,A Awe-Inspiring Documentary of a Womanizer And a Pastry Chef who must Kill a Lumberjack in Berlin,2006,3,0.99,157,10.99,PG,"Commentaries,Deleted Scenes,Behind the Scenes",ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,1,53 Idfu Parkway,Nantou,42399,10655648674,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike,Sunday,29,Weekend,May,2,2005,Monday,30,Weekday,May,2,2005
916,1290,ENGLISH BULWORTH,A Intrepid Epistle of a Pastry Chef And a Pastry Chef who must Pursue a Crocodile in Ancient China,2006,3,0.99,51,18.99,PG-13,Deleted Scenes,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,1,1795 Santiago de Compostela Way,Texas,18743,860452626434,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike,Monday,30,Weekday,May,2,2005,Tuesday,31,Weekday,May,2,2005
975,3109,PITY BOUND,A Boring Panorama of a Feminist And a Moose who must Defeat a Database Administrator in Nigeria,2006,5,4.99,60,19.99,NC-17,Commentaries,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,1,900 Santiago de Compostela Parkway,Central Serbia,93896,716571220373,Jon,Stephens,Jon.Stephens@sakilastaff.com,1,Jon,Monday,30,Weekday,May,2,2005,Friday,3,Weekday,June,2,2005
866,2867,NORTHWEST POLISH,A Boring Character Study of a Boy And a A Shark who must Outrace a Womanizer in The Outback,2006,5,2.99,172,24.99,PG,Trailers,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,1,478 Joliet Way,Hamilton,77948,657282285970,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike,Monday,30,Weekday,May,2,2005,Wednesday,8,Weekday,June,2,2005
877,886,CRUELTY UNFORGIVEN,A Brilliant Tale of a Car And a Moose who must Battle a Dentist in Nigeria,2006,7,0.99,69,29.99,G,"Deleted Scenes,Behind the Scenes",MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,1,613 Korolev Drive,Masqat,45844,380657522649,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike,Monday,30,Weekday,May,2,2005,Thursday,2,Weekday,June,2,2005
1995,1866,HAWK CHILL,A Action-Packed Drama of a Mad Scientist And a Composer who must Outgun a Car in Australia,2006,5,0.99,47,12.99,PG-13,Behind the Scenes,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,1,1531 Salé Drive,Esfahan,53628,648856936185,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike,Friday,17,Weekday,June,2,2005,Sunday,26,Weekend,June,2,2005


In [0]:
%sql
DESCRIBE EXTENDED fact_rentals_silver

col_name,data_type,comment
rental_key,bigint,
inventory_id,bigint,
film_title,string,
film_description,string,
film_release_year,int,
film_rental_duration,int,
film_rental_rate,double,
film_length,int,
film_replacement_cost,double,
film_rating,string,


##### Gold Table: Perform Aggregations
##### 

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_rentals_gold AS (
    SELECT film_title AS Film_Title,
        film_rating AS Film_Rating,
        film_rental_rate AS Rental_Rate,
        COUNT(rental_key) AS Rental_Count
  FROM sakila_dlh.fact_rentals_silver
  GROUP BY Film_Title, Film_Rating, Rental_Rate
  ORDER BY Rental_Count DESC
);

SELECT * FROM sakila_dlh.fact_rentals_gold;

Film_Title,Film_Rating,Rental_Rate,Rental_Count
ROCKETEER MOTHER,PG-13,0.99,9
IDOLS SNATCHERS,NC-17,2.99,8
FELLOWSHIP AUTUMN,NC-17,4.99,8
SEATTLE EXPECATIONS,PG-13,4.99,7
LOVE SUICIDES,R,0.99,7
PRINCESS GIANT,NC-17,2.99,7
TALENTED HOMICIDE,PG,0.99,7
WITCHES PANIC,NC-17,4.99,7
ROBBERS JOON,PG-13,2.99,7
RUGRATS SHAKESPEARE,PG-13,0.99,7


#### Clean up the File System

In [0]:
%fs rm -r /FileStore/project_data/