***Section I: Prerequisites***
1.0. Import Required **Libraries**

In [0]:
import os
import json
import pymongo
from pymongo import MongoClient
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType


#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ds2002-mysql-dha5sm.mysql.database.azure.com" #Take from server
jdbc_port = 3306
src_database = "sakila"

connection_properties = {
  "user" : "dha5sm",
  "password" : "iTri2Hard!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "samircluster.o9mqm"
atlas_database_name = "sakila" 
atlas_user_name = "myuser"
atlas_password = "Passw0rd1234"

# Data Files (JSON) Information ###############################

dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/final"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}"

rental_output_bronze = f"{database_dir}/fact_rental/bronze"
rental_output_silver = f"{database_dir}/fact_rental/silver"
rental_output_gold   = f"{database_dir}/fact_rental/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rental", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Final Database"
LOCATION "dbfs:/FileStore/final/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Lab 6.0");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-dha5sm.mysql.database.azure.com:3306/sakila", --Replace with your Server Name 
  dbtable "dim_date",
  user "dha5sm",    --Replace with your User Name
  password "iTri2Hard!"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/final/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-dha5sm.mysql.database.azure.com:3306/sakila", --Replace with your Server Name 
  dbtable "customer",
  user "dha5sm",    --Replace with your User Name
  password "iTri2Hard!"  --Replace with you password
)
-- Create a Temporary View named "view_customer" that extracts data from your MySQL Northwind database.

In [0]:
%sql
USE DATABASE sakila_dlh;
-- Create a new table named "northwind_dlh.dim_product" using data from the view named "view_customer"

CREATE OR REPLACE TABLE sakila_dlh.customer
COMMENT "Customer Dimension Table"
LOCATION "dbfs:/FileStore/final/sakila_dlh/customer"
AS SELECT * FROM view_customer

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.customer;

col_name,data_type,comment
customer_id,int,
store_id,tinyint,
first_name,varchar(45),
last_name,varchar(45),
email,varchar(50),
address_id,int,
active,boolean,
create_date,timestamp,
last_update,timestamp,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.customer LIMIT 5

customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,True,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,True,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,True,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,True,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,True,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z


In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/final/data/batch'

path,name,size,modificationTime
dbfs:/FileStore/final/data/batch/customer.csv,customer.csv,61425,1733324204000
dbfs:/FileStore/final/data/batch/customer.json,customer.json,160173,1733324204000
dbfs:/FileStore/final/data/batch/dim_date.csv,dim_date.csv,138125,1733324205000
dbfs:/FileStore/final/data/batch/dim_date.json,dim_date.json,663793,1733324205000
dbfs:/FileStore/final/data/batch/film.csv,film.csv,203420,1733324205000
dbfs:/FileStore/final/data/batch/film.json,film.json,462531,1733324205000
dbfs:/FileStore/final/data/batch/staff.csv,staff.csv,309,1733324205000
dbfs:/FileStore/final/data/batch/staff.json,staff.json,595,1733324205000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/final/data/batch'
json_files = {"customer" : 'customer.json'
              , "film" : 'film.json'}
# , "staff" : 'staff.json'

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f4c18794300>

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "myuser"
val pwd = "Passw0rd1234"
val clusterName = "samircluster.o9mqm"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila")
.option("collection", "customer")
.option("uri", atlas_uri).load()
.select("customer_id","store_id","first_name","last_name","email","address_id","active","create_date","last_update")

display(df_customer.limit(8))

customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20
6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,10,1,2006-02-14 22:04:36,2006-02-15 04:57:20
7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,11,1,2006-02-14 22:04:36,2006-02-15 04:57:20
8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,12,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [0]:
%scala
df_customer.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer

col_name,data_type,comment
customer_id,int,
store_id,int,
first_name,string,
last_name,string,
email,string,
address_id,int,
active,int,
create_date,string,
last_update,string,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20


##### 2.4.1 Fetch Film Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_film = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila")
.option("collection", "film")
.option("uri", atlas_uri).load()
.select("film_id","title","release_year","rental_duration","rental_rate","length","replacement_cost")

display(df_film.limit(8))

film_id,title,release_year,rental_duration,rental_rate,length,replacement_cost
1,ACADEMY DINOSAUR,2006,6,0.99,86,20.99
2,ACE GOLDFINGER,2006,3,4.99,48,12.99
3,ADAPTATION HOLES,2006,7,2.99,50,18.99
4,AFFAIR PREJUDICE,2006,5,2.99,117,26.99
5,AFRICAN EGG,2006,6,2.99,130,22.99
6,AGENT TRUMAN,2006,3,2.99,169,17.99
7,AIRPLANE SIERRA,2006,6,4.99,62,28.99
8,AIRPORT POLLOCK,2006,6,4.99,54,15.99


In [0]:
%scala
df_film.printSchema()

##### 2.4.2. Use the Spark DataFrame to Create a New Film Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_film.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_film")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film

col_name,data_type,comment
film_id,int,
title,string,
release_year,int,
rental_duration,int,
rental_rate,double,
length,int,
replacement_cost,double,
,,
# Delta Statistics Columns,,
Column Names,"film_id, rental_rate, rental_duration, release_year, replacement_cost, length, title",


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5

film_id,title,release_year,rental_duration,rental_rate,length,replacement_cost
1,ACADEMY DINOSAUR,2006,6,0.99,86,20.99
2,ACE GOLDFINGER,2006,3,4.99,48,12.99
3,ADAPTATION HOLES,2006,7,2.99,50,18.99
4,AFFAIR PREJUDICE,2006,5,2.99,117,26.99
5,AFRICAN EGG,2006,6,2.99,130,22.99


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
staff_csv = f"{batch_dir}/staff.csv"

df_staff = spark.read.format('csv').options(header='true', inferSchema='true').load(staff_csv)
display(df_staff)

staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
1,Mike,Hillyer,3,...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16Z
2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15T03:57:16Z


In [0]:
df_staff.printSchema()

root
 |-- staff_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- picture: string (nullable = true)
 |-- email: string (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- active: integer (nullable = true)
 |-- username: string (nullable = true)
 |-- password: string (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [0]:
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff;

col_name,data_type,comment
staff_id,int,
first_name,string,
last_name,string,
address_id,int,
picture,string,
email,string,
store_id,int,
active,int,
username,string,
password,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 2;

staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
1,Mike,Hillyer,3,...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16Z
2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15T03:57:16Z


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,customer,False
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_film,False
sakila_dlh,dim_staff,False
,_sqldf,True
,display_query_1,True
,display_query_2,True
,display_query_3,True
,display_query_4,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", rental_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("rental_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rental_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview)

In [0]:
%sql
SELECT * FROM rental_bronze_tempview

Customer_ID,Film_ID,Late_Fee,Rental_Date,Rental_Duration,Rental_ID,Rental_Revenue,Return_Date,Staff_ID,Store_ID,_rescued_data,receipt_time,source_file


In [0]:
(spark.table("rental_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f4c1832b190>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rental_bronze")
  .createOrReplaceTempView("rental_silver_tempview"))

In [0]:
%sql
SELECT * FROM rental_silver_tempview LIMIT 10

Customer_ID,Film_ID,Late_Fee,Rental_Date,Rental_Duration,Rental_ID,Rental_Revenue,Return_Date,Staff_ID,Store_ID,_rescued_data,receipt_time,source_file
194,995,14.97,2005-05-28 23:00:08,8,677,7.99,2005-06-05 19:11:08,2,1,,2024-12-05T01:42:22.683Z,dbfs:/FileStore/final/data/stream/fact_rental_3.json
269,818,9.98,2005-05-28 23:15:48,6,678,6.99,2005-06-03 04:43:48,2,1,,2024-12-05T01:42:22.683Z,dbfs:/FileStore/final/data/stream/fact_rental_3.json
526,201,5.98,2005-05-28 23:24:57,9,679,4.99,2005-06-06 21:59:57,2,1,,2024-12-05T01:42:22.683Z,dbfs:/FileStore/final/data/stream/fact_rental_3.json
482,901,1.98,2005-05-28 23:27:26,5,680,2.99,2005-06-02 02:28:26,2,1,,2024-12-05T01:42:22.683Z,dbfs:/FileStore/final/data/stream/fact_rental_3.json
531,735,0.0,2005-05-28 23:39:44,4,681,2.99,2005-06-01 01:42:44,1,2,,2024-12-05T01:42:22.683Z,dbfs:/FileStore/final/data/stream/fact_rental_3.json
148,694,0.0,2005-05-28 23:53:18,1,682,4.99,2005-05-29 19:14:18,2,1,,2024-12-05T01:42:22.683Z,dbfs:/FileStore/final/data/stream/fact_rental_3.json
197,443,0.0,2005-05-29 00:09:48,4,683,2.99,2005-06-02 04:27:48,1,2,,2024-12-05T01:42:22.683Z,dbfs:/FileStore/final/data/stream/fact_rental_3.json
461,711,9.98,2005-05-29 00:13:15,6,684,6.99,2005-06-04 21:26:15,2,2,,2024-12-05T01:42:22.683Z,dbfs:/FileStore/final/data/stream/fact_rental_3.json
172,304,11.96,2005-05-29 00:17:51,7,685,6.99,2005-06-05 05:32:51,2,1,,2024-12-05T01:42:22.683Z,dbfs:/FileStore/final/data/stream/fact_rental_3.json
411,532,0.0,2005-05-29 00:27:10,1,686,4.99,2005-05-30 02:29:10,1,2,,2024-12-05T01:42:22.683Z,dbfs:/FileStore/final/data/stream/fact_rental_3.json


In [0]:
%sql
DESCRIBE EXTENDED rental_silver_tempview

col_name,data_type,comment
Customer_ID,bigint,
Film_ID,bigint,
Late_Fee,double,
Rental_Date,string,
Rental_Duration,bigint,
Rental_ID,bigint,
Rental_Revenue,double,
Return_Date,string,
Staff_ID,bigint,
Store_ID,bigint,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rental_silver_tempview AS (
  SELECT 
      f.Rental_ID,
      f.Customer_ID,
      c.first_name AS customer_first_name,
      c.last_name AS customer_last_name,
      f.Film_ID,
      fl.title AS film_title,
      fl.rental_duration AS film_rental_duration,
      fl.rental_rate AS film_rental_rate,
      fl.replacement_cost AS film_replacement_cost,
      f.Staff_ID,
      s.first_name AS staff_first_name,
      s.last_name AS staff_last_name,
      f.Store_ID,
      f.Rental_Date AS rental_date,
      d1.day_name_of_week AS rental_day_name_of_week,
      d1.day_of_month AS rental_day_of_month,
      d1.weekday_weekend AS rental_weekday_weekend,
      d1.month_name AS rental_month_name,
      d1.calendar_quarter AS rental_quarter,
      d1.calendar_year AS rental_year,
      f.Return_Date AS return_date,
      d2.day_name_of_week AS return_day_name_of_week,
      d2.day_of_month AS return_day_of_month,
      d2.weekday_weekend AS return_weekday_weekend,
      d2.month_name AS return_month_name,
      d2.calendar_quarter AS return_quarter,
      d2.calendar_year AS return_year,
      f.Rental_Duration,
      f.Rental_Revenue,
      f.Late_Fee
  FROM rental_silver_tempview AS f
  INNER JOIN sakila_dlh.dim_customer AS c
      ON c.customer_id = f.Customer_ID
  INNER JOIN sakila_dlh.dim_film AS fl
      ON fl.film_id = f.Film_ID
  INNER JOIN sakila_dlh.dim_staff AS s
      ON s.staff_id = f.Staff_ID
  LEFT OUTER JOIN sakila_dlh.dim_date AS d1
      ON d1.date_key = CAST(DATE_FORMAT(f.Rental_Date, 'yyyyMMdd') AS INT)
  LEFT OUTER JOIN sakila_dlh.dim_date AS d2
      ON d2.date_key = CAST(DATE_FORMAT(f.Return_Date, 'yyyyMMdd') AS INT)
);


In [0]:
%sql
SELECT * FROM fact_rental_silver_tempview LIMIT 5

Rental_ID,Customer_ID,customer_first_name,customer_last_name,Film_ID,film_title,film_rental_duration,film_rental_rate,film_replacement_cost,Staff_ID,staff_first_name,staff_last_name,Store_ID,rental_date,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_quarter,rental_year,return_date,return_day_name_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_quarter,return_year,Rental_Duration,Rental_Revenue,Late_Fee
677,194,KRISTEN,CHAVEZ,995,YENTL IDAHO,5,4.99,11.99,2,Jon,Stephens,1,2005-05-28 23:00:08,Saturday,28,Weekend,May,2,2005,2005-06-05 19:11:08,Sunday,5,Weekend,June,2,2005,8,7.99,14.97
678,269,CASSANDRA,WALTERS,818,SOMETHING DUCK,4,4.99,17.99,2,Jon,Stephens,1,2005-05-28 23:15:48,Saturday,28,Weekend,May,2,2005,2005-06-03 04:43:48,Friday,3,Weekday,June,2,2005,6,6.99,9.98
679,526,KARL,SEAL,201,CYCLONE FAMILY,7,2.99,18.99,2,Jon,Stephens,1,2005-05-28 23:24:57,Saturday,28,Weekend,May,2,2005,2005-06-06 21:59:57,Monday,6,Weekday,June,2,2005,9,4.99,5.98
680,482,MAURICE,CRAWLEY,901,TRACY CIDER,3,0.99,29.99,2,Jon,Stephens,1,2005-05-28 23:27:26,Saturday,28,Weekend,May,2,2005,2005-06-02 02:28:26,Thursday,2,Weekday,June,2,2005,5,2.99,1.98
681,531,JAMIE,WAUGH,735,ROBBERS JOON,7,2.99,26.99,1,Mike,Hillyer,2,2005-05-28 23:39:44,Saturday,28,Weekend,May,2,2005,2005-06-01 01:42:44,Wednesday,1,Weekday,June,2,2005,4,2.99,0.0


In [0]:
(spark.table("fact_rental_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f4bf072b950>

In [0]:
%sql
SELECT * FROM fact_rental_silver limit 5

Rental_ID,Customer_ID,customer_first_name,customer_last_name,Film_ID,film_title,film_rental_duration,film_rental_rate,film_replacement_cost,Staff_ID,staff_first_name,staff_last_name,Store_ID,rental_date,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_quarter,rental_year,return_date,return_day_name_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_quarter,return_year,Rental_Duration,Rental_Revenue,Late_Fee
677,194,KRISTEN,CHAVEZ,995,YENTL IDAHO,5,4.99,11.99,2,Jon,Stephens,1,2005-05-28 23:00:08,Saturday,28,Weekend,May,2,2005,2005-06-05 19:11:08,Sunday,5,Weekend,June,2,2005,8,7.99,14.97
678,269,CASSANDRA,WALTERS,818,SOMETHING DUCK,4,4.99,17.99,2,Jon,Stephens,1,2005-05-28 23:15:48,Saturday,28,Weekend,May,2,2005,2005-06-03 04:43:48,Friday,3,Weekday,June,2,2005,6,6.99,9.98
679,526,KARL,SEAL,201,CYCLONE FAMILY,7,2.99,18.99,2,Jon,Stephens,1,2005-05-28 23:24:57,Saturday,28,Weekend,May,2,2005,2005-06-06 21:59:57,Monday,6,Weekday,June,2,2005,9,4.99,5.98
680,482,MAURICE,CRAWLEY,901,TRACY CIDER,3,0.99,29.99,2,Jon,Stephens,1,2005-05-28 23:27:26,Saturday,28,Weekend,May,2,2005,2005-06-02 02:28:26,Thursday,2,Weekday,June,2,2005,5,2.99,1.98
681,531,JAMIE,WAUGH,735,ROBBERS JOON,7,2.99,26.99,1,Mike,Hillyer,2,2005-05-28 23:39:44,Saturday,28,Weekend,May,2,2005,2005-06-01 01:42:44,Wednesday,1,Weekday,June,2,2005,4,2.99,0.0


In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.fact_rental_silver

org.apache.spark.sql.catalyst.ExtendedAnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `northwind_dlh`.`fact_rental_silver` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS. SQLSTATE: 42P01; line 1 pos 18;
'DescribeRelation true, [col_name#1260275, data_type#1260276, comment#1260277]
+- 'UnresolvedTableOrView [northwind_dlh, fact_rental_silver], DESCRIBE TABLE, true

	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.tableNotFound(package.scala:90)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:214)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:197)
	at org.apache.spark.sql.catalyst.trees.Tree

##### 6.3. Gold Table: Perform Aggregations


###Gold Table 1: Monthly Rentals by Customer

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_monthly_rentals_by_customer_gold AS (
  SELECT 
      Customer_ID AS CustomerID,
      customer_last_name AS LastName,
      customer_first_name AS FirstName,
      rental_month_name AS RentalMonth,
      COUNT(Film_ID) AS FilmRentalCount
  FROM sakila_dlh.fact_rental_silver
  GROUP BY CustomerID, LastName, FirstName, RentalMonth
  ORDER BY FilmRentalCount DESC
);

-- Verify the output
SELECT * FROM sakila_dlh.fact_monthly_rentals_by_customer_gold limit 15;


CustomerID,LastName,FirstName,RentalMonth,FilmRentalCount
197,PETERS,SUE,May,8
506,SEWARD,LESLIE,May,7
109,WEST,EDNA,May,7
269,WALTERS,CASSANDRA,May,7
371,POULIN,BILLY,May,6
274,JENNINGS,NAOMI,May,6
245,DAY,COURTNEY,May,6
19,MARTINEZ,RUTH,May,6
53,MORRIS,HEATHER,May,6
596,FORSYTHE,ENRIQUE,May,6


###Gold Table 2: Total Revenue by Customer

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_total_revenue_by_customer_gold AS (
  SELECT 
      Customer_ID AS CustomerID,
      customer_last_name AS LastName,
      customer_first_name AS FirstName,
      SUM(Rental_Revenue) AS TotalRevenue,
      SUM(Late_Fee) AS TotalLateFees
  FROM sakila_dlh.fact_rental_silver
  GROUP BY CustomerID, LastName, FirstName
  ORDER BY TotalRevenue DESC
);

-- Verify the output
SELECT * FROM sakila_dlh.fact_total_revenue_by_customer_gold limit 15;

CustomerID,LastName,FirstName,TotalRevenue,TotalLateFees
239,ROMERO,MINNIE,33.940000000000005,31.9
246,MENDOZA,MARIAN,30.950000000000003,37.86
245,DAY,COURTNEY,28.94,42.89
506,SEWARD,LESLIE,28.93,29.82
109,WEST,EDNA,27.93,26.93
105,SULLIVAN,DAWN,26.96,44.91
311,TROUT,PAUL,26.950000000000003,23.84
429,ISBELL,FREDERICK,26.95,23.92
53,MORRIS,HEATHER,26.940000000000005,32.93
19,MARTINEZ,RUTH,26.94,36.91


###Gold Table 3: Monthly Revenue Trends

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_monthly_revenue_gold AS (
  SELECT 
      rental_month_name AS RentalMonth,
      rental_year AS RentalYear,
      SUM(Rental_Revenue) AS TotalRevenue,
      SUM(Late_Fee) AS TotalLateFees
  FROM fact_rental_silver
  GROUP BY RentalMonth, RentalYear
  ORDER BY RentalYear, 
           CASE 
               WHEN RentalMonth = 'January' THEN 1
               WHEN RentalMonth = 'February' THEN 2
               WHEN RentalMonth = 'March' THEN 3
               WHEN RentalMonth = 'April' THEN 4
               WHEN RentalMonth = 'May' THEN 5
               WHEN RentalMonth = 'June' THEN 6
               WHEN RentalMonth = 'July' THEN 7
               WHEN RentalMonth = 'August' THEN 8
               WHEN RentalMonth = 'September' THEN 9
               WHEN RentalMonth = 'October' THEN 10
               WHEN RentalMonth = 'November' THEN 11
               WHEN RentalMonth = 'December' THEN 12
           END
);

-- Verify the output
SELECT * FROM sakila_dlh.fact_monthly_revenue_gold;

RentalMonth,RentalYear,TotalRevenue,TotalLateFees
May,2005,4824.449999999929,4036.609999999995
