## Camila Capstone DataLakehouse Structured Streaming

Using Azure Databricks, a Date Lakehouse was designed and populated to represent a simple business process of customer sales by payment type, staff, and date.

Design Overview: 
- Date dimension
- 4 dimension tables (customers, store, film, inventory).
- 1 fact table that models the business process (rental).

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "xpa7ez-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila"

connection_properties = {
  "user" : "camilagutie15",
  "password" : "Passw0rd1234",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.pbet7"
atlas_database_name = "sakila_dw2"
atlas_user_name = "camilagutie15"
atlas_password = "Passw0rd1234"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rental_stream_dir = f"{stream_dir}/orders"

rental_output_bronze = f"{database_dir}/fact_rental/bronze"
rental_output_silver = f"{database_dir}/fact_rental/silver"
rental_output_gold   = f"{database_dir}/fact_rental/gold"


# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rental", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### 3.0. Define Global Functions

In [0]:

# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.

def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.

def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 CG Capstone Database"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 CG Capstone");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://xpa7ez-mysql.mysql.database.azure.com:3306/sakila", --Replace with your Server Name
  dbtable "dim_date",
  user "camilagutie15",    --Replace with your User Name
  password "Passw0rd1234"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Film Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_film" that extracts data from your MySQL Sakila database.


CREATE OR REPLACE TEMPORARY VIEW view_film
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://xpa7ez-mysql.mysql.database.azure.com:3306/sakila",
  dbtable "film",
  user "camilagutie15",  
  password "Passw0rd1234" 
)


In [0]:
%sql
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.film" using data from the view named "view_film"

CREATE OR REPLACE TABLE sakila_dlh.film
COMMENT "Film Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh/view_film"
AS SELECT * FROM view_film

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.film;

col_name,data_type,comment
film_id,int,
title,varchar(128),
description,varchar(65535),
release_year,date,
language_id,tinyint,
original_language_id,tinyint,
rental_duration,tinyint,
rental_rate,"decimal(4,2)",
length,int,
replacement_cost,"decimal(5,2)",


In [0]:
%sql
SELECT * FROM sakila_dlh.film LIMIT 5

film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006-01-01,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15T05:03:42Z
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006-01-01,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15T05:03:42Z
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006-01-01,1,,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",2006-02-15T05:03:42Z
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006-01-01,1,,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",2006-02-15T05:03:42Z
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006-01-01,1,,6,2.99,130,22.99,G,Deleted Scenes,2006-02-15T05:03:42Z


##### 1.4. Create a New Table that Sources Inventory Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_inventory" that extracts data from your MySQL Sakila database.


CREATE OR REPLACE TEMPORARY VIEW view_inventory
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://xpa7ez-mysql.mysql.database.azure.com:3306/sakila",
  dbtable "inventory",
  user "camilagutie15",  
  password "Passw0rd1234" 
)


In [0]:
%sql
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.inventory" using data from the view named "view_inventory"

CREATE OR REPLACE TABLE sakila_dlh.inventory
COMMENT "Inventory Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh/view_inventory"
AS SELECT * FROM view_inventory

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.inventory;

col_name,data_type,comment
inventory_id,bigint,
film_id,int,
store_id,tinyint,
last_update,timestamp,
,,
# Delta Statistics Columns,,
Column Names,"inventory_id, film_id, store_id, last_update",
Column Selection Method,first-32,
,,
# Detailed Table Information,,


In [0]:
%sql
SELECT * FROM sakila_dlh.inventory LIMIT 5

inventory_id,film_id,store_id,last_update
1,1,1,2006-02-15T05:09:17Z
2,1,1,2006-02-15T05:09:17Z
3,1,1,2006-02-15T05:09:17Z
4,1,1,2006-02-15T05:09:17Z
5,1,2,2006-02-15T05:09:17Z


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/lab_data/retail/batch'

path,name,size,modificationTime
dbfs:/FileStore/lab_data/retail/batch/Sakila_Customer.csv,Sakila_Customer.csv,65037,1733596827000
dbfs:/FileStore/lab_data/retail/batch/Sakila_DimCustomer.json,Sakila_DimCustomer.json,148680,1733596681000
dbfs:/FileStore/lab_data/retail/batch/Sakila_DimDate.csv,Sakila_DimDate.csv,651415,1733596681000
dbfs:/FileStore/lab_data/retail/batch/Sakila_DimStore.json,Sakila_DimStore.json,195,1733596760000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/lab_data/retail/batch'
json_files = {
    "store": "Sakila_DimStore.json"
}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files)

<pymongo.results.InsertManyResult at 0x7f84834f3040>

##### 2.3.1. Fetch Store Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "camilagutie15"
val pwd = "Passw0rd1234"
val clusterName = "cluster0.pbet7"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_store = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw2")
.option("collection", "store")
.option("uri", atlas_uri).load()
.select("store_key", "store_id", "manager_staff_id", "address_id", "last_update_key")

display(df_store)

store_key,store_id,manager_staff_id,address_id,last_update_key
1,1,1,1,20060215
2,2,2,2,20060215


In [0]:
%scala
df_store.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Store Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_store.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.store")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.store

col_name,data_type,comment
store_key,int,
store_id,int,
manager_staff_id,int,
address_id,int,
last_update_key,int,
,,
# Delta Statistics Columns,,
Column Names,"store_id, address_id, manager_staff_id, store_key, last_update_key",
Column Selection Method,first-32,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.store LIMIT 5

store_key,store_id,manager_staff_id,address_id,last_update_key
1,1,1,1,20060215
2,2,2,2,20060215


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read Customer From a CSV File

In [0]:
customer_csv = f"{batch_dir}/Sakila_Customer.csv"

df_customer = spark.read.format('csv').options(header='true', inferSchema='true').load(customer_csv)

display(df_customer)

customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,10,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,11,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,12,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,13,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
10,1,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,14,1,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z


In [0]:
df_customer.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- active: integer (nullable = true)
 |-- create_date: timestamp (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [0]:
df_customer.write.format("delta") \
    .option("delta.columnMapping.mode", "name") \
    .mode("overwrite") \
    .saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
customer_id,int,
store_id,int,
first_name,string,
last_name,string,
email,string,
address_id,int,
active,int,
create_date,timestamp,
last_update,timestamp,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.store LIMIT 5;

store_key,store_id,manager_staff_id,address_id,last_update_key
1,1,1,1,20060215
2,2,2,2,20060215


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,film,False
sakila_dlh,inventory,False
sakila_dlh,store,False
,_sqldf,True
,display_query_1,True
,display_query_2,True
,display_query_3,True
,display_query_4,True


### Section III: Integrate Reference Data with Real-Time Fact_Rental Data
#### 4.0. Use AutoLoader to Process Streaming (Hot Path) Fact Rental Data 
##### 4.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "fact_rental_key BIGINT")
 #.option("cloudFiles.schemaHints", "order_key BIGINT")
 #.option("cloudFiles.schemaHints", "customer_key BIGINT") 
 #.option("cloudFiles.schemaHints", "create_date_key BIGINT")
 #.option("cloudFiles.schemaHints", "film_key DECIMAL")
 #.option("cloudFiles.schemaHints", "last_update_key DECIMAL")
 #.option("cloudFiles.schemaHints", "first_name STRING")
 #.option("cloudFiles.schemaHints", "last_name STRING")
 #.option("cloudFiles.schemaHints", "email STRING")
 .option("cloudFiles.schemaLocation", rental_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rental_stream_dir)
 .createOrReplaceTempView("rental_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rental_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rental_raw_tempview
)

In [0]:
%sql
SELECT * FROM rental_bronze_tempview

customer_id,inventory_id,last_update,rental_date,rental_date_key,rental_id,return_date,return_date_key,staff_id,_rescued_data,receipt_time,source_file
470,2538,2006-02-15 21:30:53,2005-07-31 17:54:35,20050731,10006,2005-08-02 20:40:35,20050802,2,,2024-12-07T18:42:41.066Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
445,293,2006-02-15 21:30:53,2005-07-31 17:54:58,20050731,10007,2005-08-05 17:24:58,20050805,2,,2024-12-07T18:42:41.066Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
91,2589,2006-02-15 21:30:53,2005-07-31 17:59:36,20050731,10008,2005-08-03 22:43:36,20050803,2,,2024-12-07T18:42:41.066Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
437,4441,2006-02-15 21:30:53,2005-07-31 18:00:28,20050731,10009,2005-08-08 22:24:28,20050808,2,,2024-12-07T18:42:41.066Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
373,2655,2006-02-15 21:30:53,2005-07-31 18:01:36,20050731,10010,2005-08-07 20:27:36,20050807,2,,2024-12-07T18:42:41.066Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
128,606,2006-02-15 21:30:53,2005-07-31 18:02:41,20050731,10011,2005-08-08 17:04:41,20050808,1,,2024-12-07T18:42:41.066Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
513,2554,2006-02-15 21:30:53,2005-07-31 18:06:06,20050731,10012,2005-08-09 16:47:06,20050809,2,,2024-12-07T18:42:41.066Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
377,2364,2006-02-15 21:30:53,2005-07-31 18:08:21,20050731,10013,2005-08-08 13:22:21,20050808,2,,2024-12-07T18:42:41.066Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
443,2344,2006-02-15 21:30:53,2005-07-31 18:10:56,20050731,10014,2005-08-02 23:36:56,20050802,1,,2024-12-07T18:42:41.066Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
153,67,2006-02-15 21:30:53,2005-07-31 18:11:17,20050731,10015,2005-08-03 15:48:17,20050803,2,,2024-12-07T18:42:41.066Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json


In [0]:
(spark.table("rental_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f8448d0a610>

##### 4.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rental_bronze")
  .createOrReplaceTempView("rental_silver_tempview"))

In [0]:
%sql
SELECT * FROM rental_silver_tempview

customer_id,inventory_id,last_update,rental_date,rental_date_key,rental_id,return_date,return_date_key,staff_id,_rescued_data,receipt_time,source_file
470,2538,2006-02-15 21:30:53,2005-07-31 17:54:35,20050731,10006,2005-08-02 20:40:35,20050802,2,,2024-12-07T18:42:44.138Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
445,293,2006-02-15 21:30:53,2005-07-31 17:54:58,20050731,10007,2005-08-05 17:24:58,20050805,2,,2024-12-07T18:42:44.138Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
91,2589,2006-02-15 21:30:53,2005-07-31 17:59:36,20050731,10008,2005-08-03 22:43:36,20050803,2,,2024-12-07T18:42:44.138Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
437,4441,2006-02-15 21:30:53,2005-07-31 18:00:28,20050731,10009,2005-08-08 22:24:28,20050808,2,,2024-12-07T18:42:44.138Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
373,2655,2006-02-15 21:30:53,2005-07-31 18:01:36,20050731,10010,2005-08-07 20:27:36,20050807,2,,2024-12-07T18:42:44.138Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
128,606,2006-02-15 21:30:53,2005-07-31 18:02:41,20050731,10011,2005-08-08 17:04:41,20050808,1,,2024-12-07T18:42:44.138Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
513,2554,2006-02-15 21:30:53,2005-07-31 18:06:06,20050731,10012,2005-08-09 16:47:06,20050809,2,,2024-12-07T18:42:44.138Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
377,2364,2006-02-15 21:30:53,2005-07-31 18:08:21,20050731,10013,2005-08-08 13:22:21,20050808,2,,2024-12-07T18:42:44.138Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
443,2344,2006-02-15 21:30:53,2005-07-31 18:10:56,20050731,10014,2005-08-02 23:36:56,20050802,1,,2024-12-07T18:42:44.138Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json
153,67,2006-02-15 21:30:53,2005-07-31 18:11:17,20050731,10015,2005-08-03 15:48:17,20050803,2,,2024-12-07T18:42:44.138Z,dbfs:/FileStore/lab_data/retail/stream/orders/Sakila_FactRental03.json


In [0]:
%sql
DESCRIBE EXTENDED rental_silver_tempview

col_name,data_type,comment
customer_id,bigint,
inventory_id,bigint,
last_update,string,
rental_date,string,
rental_date_key,string,
rental_id,bigint,
return_date,string,
return_date_key,string,
staff_id,bigint,
_rescued_data,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rental_silver_tempview AS (
  SELECT r.customer_id,
  c.first_name,
  c.last_name,
  c.email,
  c.store_id AS customer_store_id,
  f.title,
  f.release_year,  
  f.rating,
  f.rental_duration, 
  f.rental_rate, 
  i.inventory_id,
  s.store_id AS film_store_id, 
  r.rental_date_key,
  rd.day_name_of_week AS rental_day_of_week,
  rd.day_of_month AS rental_day_of_month,
  rd.weekday_weekend AS rental_weekday_weekend,
  rd.month_name AS rental_month_name,
  rd.calendar_quarter AS rental_calendar_quarter,
  rd.calendar_year AS rental_calendar_year,
  r.return_date_key,
  rtd.day_name_of_week AS return_day_of_week,
  rtd.day_of_month AS return_day_of_month,
  rtd.weekday_weekend AS return_weekday_weekend,
  rtd.month_name AS return_month_name,
  rtd.calendar_quarter AS return_calendar_quarter,
  rtd.calendar_year AS return_calendar_year
  FROM rental_silver_tempview AS r
  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_id = r.customer_id
  INNER JOIN sakila_dlh.inventory AS i
  ON i.inventory_id = r.inventory_id
  INNER JOIN sakila_dlh.film AS f
  ON f.film_id = i.film_id
  INNER JOIN sakila_dlh.store AS s
  ON s.store_id = i.store_id
  INNER JOIN sakila_dlh.dim_date AS rd
  ON rd.date_key = r.rental_date_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS rtd
  ON rtd.date_key = r.return_date_key
)  

In [0]:
%sql

select * from fact_rental_silver_tempview

customer_id,first_name,last_name,email,customer_store_id,title,release_year,rating,rental_duration,rental_rate,inventory_id,film_store_id,rental_date_key,rental_day_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_calendar_quarter,rental_calendar_year,return_date_key,return_day_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_calendar_quarter,return_calendar_year
470,GORDON,ALLARD,GORDON.ALLARD@sakilacustomer.org,1,MALTESE HOPE,2006-01-01,PG-13,6,4.99,2538,2,20050731,Sunday,31,Weekend,July,3,2005,20050802,Tuesday,2,Weekday,August,3,2005
445,MICHEAL,FORMAN,MICHEAL.FORMAN@sakilacustomer.org,1,BENEATH RUSH,2006-01-01,NC-17,6,0.99,293,1,20050731,Sunday,31,Weekend,July,3,2005,20050805,Friday,5,Weekday,August,3,2005
91,LOIS,BUTLER,LOIS.BUTLER@sakilacustomer.org,2,MEMENTO ZOOLANDER,2006-01-01,NC-17,4,4.99,2589,2,20050731,Sunday,31,Weekend,July,3,2005,20050803,Wednesday,3,Weekday,August,3,2005
437,RANDALL,NEUMANN,RANDALL.NEUMANN@sakilacustomer.org,2,WHALE BIKINI,2006-01-01,PG-13,4,4.99,4441,1,20050731,Sunday,31,Weekend,July,3,2005,20050808,Monday,8,Weekday,August,3,2005
373,LOUIS,LEONE,LOUIS.LEONE@sakilacustomer.org,1,MIRACLE VIRTUAL,2006-01-01,PG-13,3,2.99,2655,2,20050731,Sunday,31,Weekend,July,3,2005,20050807,Sunday,7,Weekend,August,3,2005
128,MARJORIE,TUCKER,MARJORIE.TUCKER@sakilacustomer.org,1,CHAINSAW UPTOWN,2006-01-01,PG,6,0.99,606,1,20050731,Sunday,31,Weekend,July,3,2005,20050808,Monday,8,Weekday,August,3,2005
513,DUANE,TUBBS,DUANE.TUBBS@sakilacustomer.org,2,MARS ROMAN,2006-01-01,NC-17,6,0.99,2554,1,20050731,Sunday,31,Weekend,July,3,2005,20050809,Tuesday,9,Weekday,August,3,2005
377,HOWARD,FORTNER,HOWARD.FORTNER@sakilacustomer.org,1,LEBOWSKI SOLDIERS,2006-01-01,PG-13,6,2.99,2364,2,20050731,Sunday,31,Weekend,July,3,2005,20050808,Monday,8,Weekday,August,3,2005
443,FRANCISCO,SKIDMORE,FRANCISCO.SKIDMORE@sakilacustomer.org,2,LAWLESS VISION,2006-01-01,G,6,4.99,2344,1,20050731,Sunday,31,Weekend,July,3,2005,20050802,Tuesday,2,Weekday,August,3,2005
153,SUZANNE,NICHOLS,SUZANNE.NICHOLS@sakilacustomer.org,2,ALI FOREVER,2006-01-01,PG,4,4.99,67,2,20050731,Sunday,31,Weekend,July,3,2005,20050803,Wednesday,3,Weekday,August,3,2005


In [0]:
(spark.table("fact_rental_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f842c51ead0>

In [0]:
%sql
SELECT * FROM fact_rental_silver

customer_id,first_name,last_name,email,customer_store_id,title,release_year,rating,rental_duration,rental_rate,inventory_id,film_store_id,rental_date_key,rental_day_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_calendar_quarter,rental_calendar_year,return_date_key,return_day_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_calendar_quarter,return_calendar_year


Databricks data profile. Run in Databricks to view.

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rental_silver

col_name,data_type,comment
customer_id,bigint,
first_name,string,
last_name,string,
email,string,
customer_store_id,int,
title,varchar(128),
release_year,date,
rating,varchar(5),
rental_duration,tinyint,
rental_rate,"decimal(4,2)",


##### 4.3. Gold Table: Perform Aggregations
Create a new Gold table using the CTAS approach. The table should include the number of films rented per customer each Month, along with the Customers' ID, First & Last Name, and the Month in which the rental was placed.

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_monthly_rental_by_customer_gold AS (
  SELECT customer_id AS CustomerID
    , last_name AS LastName
    , first_name AS FirstName
    , rental_month_name AS RentalMonth
    , rental_calendar_year AS RentalYear
    , COUNT(inventory_id) AS RentalCount
    , SUM(rental_rate) AS Revenue
  FROM sakila_dlh.fact_rental_silver
  GROUP BY CustomerID, LastName, FirstName, RentalYear, RentalMonth
  ORDER BY RentalCount DESC);

SELECT * FROM sakila_dlh.fact_monthly_rental_by_customer_gold;

CustomerID,LastName,FirstName,RentalMonth,RentalYear,RentalCount,Revenue
148,HUNT,ELEANOR,July,2005,22,79.78
102,FORD,CRYSTAL,July,2005,21,48.79
75,SANDERS,TAMMY,July,2005,20,49.8
236,DEAN,MARCIA,July,2005,20,59.8
595,GUNDERSON,TERRENCE,July,2005,19,62.81
30,KING,MELISSA,July,2005,19,58.81
366,HUEY,BRANDON,July,2005,19,48.81
526,SEAL,KARL,July,2005,19,54.81
354,NGO,JUSTIN,July,2005,19,46.81
137,KENNEDY,RHONDA,July,2005,19,64.81


In [0]:
%sql
-- Rental By Stores

CREATE OR REPLACE TABLE sakila_dlh.fact_monthly_rental_by_store_gold AS (
  SELECT film_store_id AS StoreID
    , rental_month_name AS RentalMonth
    , rental_calendar_year AS RentalYear
    , COUNT(inventory_id) AS RentalCount
    , SUM(rental_rate) AS Revenue
  FROM sakila_dlh.fact_rental_silver
  GROUP BY StoreID, RentalYear, RentalMonth
  ORDER BY RentalCount DESC);

SELECT * FROM sakila_dlh.fact_monthly_rental_by_store_gold;

StoreID,RentalMonth,RentalYear,RentalCount,Revenue
2,July,2005,3375,9861.25
1,July,2005,3334,9914.66
2,August,2005,2885,8464.15
1,August,2005,2801,8292.99
2,June,2005,1190,3444.1
1,June,2005,1121,3331.79
2,May,2005,581,1667.19
1,May,2005,575,1721.25
1,February,2006,92,249.08
2,February,2006,90,265.1
