## Lab 06: Data Lakehouse with Structured Streaming
This lab will help you learn to use many of the software libraries and programming techniques required to fulfill the requirements of the final end-of-session capstone project for course **DS-2002: Data Systems**. The spirit of the project is to provide a capstone challenge that requires students to demonstrate a practical and functional understanding of each of the data systems and architectural principles covered throughout the session.

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "maf9tga-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "root",
  "password" : "Passw0rd123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "Cluster0"
atlas_database_name = "sakila_rental_fact"
atlas_user_name = "maf9tga"
atlas_password = "Passw0rd123"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rentals_stream_dir = f"{stream_dir}/rentals" #rentals = orders
#purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
#inventory_trans_stream_dir = f"{stream_dir}/inventory_transactions"

rentals_output_bronze = f"{database_dir}/fact_rentals/bronze"
rentals_output_silver = f"{database_dir}/fact_rentals/silver"
rentals_output_gold   = f"{database_dir}/fact_rentals/gold"

#purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
#purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
#purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"

#inventory_trans_output_bronze = f"{database_dir}/fact_inventory_transactions/bronze"
#inventory_trans_output_silver = f"{database_dir}/fact_inventory_transactions/silver"
#inventory_trans_output_gold   = f"{database_dir}/fact_inventory_transactions/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rentals", True) 
#dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
#dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[2]: True

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.8rqtmzl.mongodb.net/{db_name}"
    #mongodb+srv://maf9tga:Passw0rd123@cluster0.8rqtmzl.mongodb.net/?retryWrites=true&w=majority
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Lab 06 Database"
LOCATION "dbfs:/FileStore/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Lab 6.0");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://maf9tga-mysql.mysql.database.azure.com:3306/sakila_dw",
  dbtable "dim_date",
  user "maf9tga",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Customer Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_product" that extracts data from your MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://maf9tga-mysql.mysql.database.azure.com:3306/sakila_dw",
  dbtable "dim_customer",
  user "maf9tga",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_customer" using data from the view named "view_customer"
CREATE OR REPLACE TABLE sakila_dlh.dim_customer
COMMENT "customer Dimension Table"
LOCATION "dbfs:/FileStore/sakila_dlh/dim_customer"
AS SELECT * FROM view_customer

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
customer_id,int,
store_id,int,
first_name,string,
last_name,string,
email,string,
address_id,int,
active,boolean,
create_date,timestamp,
last_update,timestamp,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,True,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,True,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,True,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,True,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,True,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/source_data/batch/Northwind_DimCustomers.json,Northwind_DimCustomers.json,10476,1681864144000
dbfs:/FileStore/source_data/batch/Northwind_DimEmployees.csv,Northwind_DimEmployees.csv,2164,1681864143000
dbfs:/FileStore/source_data/batch/Northwind_DimInvoices.json,Northwind_DimInvoices.json,6263,1681864143000
dbfs:/FileStore/source_data/batch/Northwind_DimShippers.csv,Northwind_DimShippers.csv,262,1681864143000
dbfs:/FileStore/source_data/batch/Northwind_DimSuppliers.json,Northwind_DimSuppliers.json,1480,1681864143000
dbfs:/FileStore/source_data/batch/sakila_customer.json,sakila_customer.json,136808,1683578049000
dbfs:/FileStore/source_data/batch/sakila_film.json,sakila_film.json,363527,1683578050000
dbfs:/FileStore/source_data/batch/sakila_inventory.json,sakila_inventory.json,410695,1683578050000
dbfs:/FileStore/source_data/batch/sakila_staff.json,sakila_staff.json,279,1683578050000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/source_data/batch'
json_files = {"customer" : 'sakila_customer.json', "inventory" : 'sakila_inventory.json', "staff" : 'sakila_staff.json', "film" : 'sakila_film.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[15]: <pymongo.results.InsertManyResult at 0x7fed23e1dbc0>

##### 2.3.1. Fetch Inventory Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_inventory = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_rental_fact").option("collection", "inventory").load()
.select("_id","inventory_id","film_id","store_id","last_update")

display(df_inventory)

_id,inventory_id,film_id,store_id,last_update
List(645982085abd1ab718098c33),1,1,1,2006-02-15 05:09:17
List(645982085abd1ab718098c34),2,1,1,2006-02-15 05:09:17
List(645982085abd1ab718098c35),3,1,1,2006-02-15 05:09:17
List(645982085abd1ab718098c36),4,1,1,2006-02-15 05:09:17
List(645982085abd1ab718098c37),5,1,2,2006-02-15 05:09:17
List(645982085abd1ab718098c38),6,1,2,2006-02-15 05:09:17
List(645982085abd1ab718098c39),7,1,2,2006-02-15 05:09:17
List(645982085abd1ab718098c3a),8,1,2,2006-02-15 05:09:17
List(645982085abd1ab718098c3b),9,2,2,2006-02-15 05:09:17
List(645982085abd1ab718098c3c),10,2,2,2006-02-15 05:09:17


In [0]:
%scala
df_inventory.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_inventory.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_inventory")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory

col_name,data_type,comment
_id,struct,
inventory_id,int,
film_id,int,
store_id,int,
last_update,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_inventory,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5

_id,inventory_id,film_id,store_id,last_update
List(645982085abd1ab718098c33),1,1,1,2006-02-15 05:09:17
List(645982085abd1ab718098c34),2,1,1,2006-02-15 05:09:17
List(645982085abd1ab718098c35),3,1,1,2006-02-15 05:09:17
List(645982085abd1ab718098c36),4,1,1,2006-02-15 05:09:17
List(645982085abd1ab718098c37),5,1,2,2006-02-15 05:09:17


##### 2.4.1 Fetch staff Dimension Data from the New MongoDB Collection

In [0]:
%scala

import com.mongodb.spark._

val df_staff = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_rental_fact").option("collection", "staff").load()
.select("_id","staff_id","first_name", "last_name", "address_id", "store_id", "active", "last_update")

display(df_staff)

_id,staff_id,first_name,last_name,address_id,store_id,active,last_update
List(6459820d5abd1ab718099e18),1,Mike,Hillyer,3,1,1,2006-02-15 03:57:16
List(6459820d5abd1ab718099e19),2,Jon,Stephens,4,2,1,2006-02-15 03:57:16


In [0]:
%scala
df_staff.printSchema()

##### 2.4.2. Use the Spark DataFrame to Create a New Suppliers Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff

col_name,data_type,comment
_id,struct,
staff_id,int,
first_name,string,
last_name,string,
address_id,int,
store_id,int,
active,int,
last_update,string,
,,
# Detailed Table Information,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 5

_id,staff_id,first_name,last_name,address_id,store_id,active,last_update
List(6459820d5abd1ab718099e18),1,Mike,Hillyer,3,1,1,2006-02-15 03:57:16
List(6459820d5abd1ab718099e19),2,Jon,Stephens,4,2,1,2006-02-15 03:57:16


##### 2.5.1 Fetch Film Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_film = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_rental_fact").option("collection", "film").load()
.select("_id","film_id","title","rental_duration","rental_rate", "length", "replacement_cost", "last_update")

display(df_film)

_id,film_id,title,rental_duration,rental_rate,length,replacement_cost,last_update
List(6459820e5abd1ab718099e1a),1,ACADEMY DINOSAUR,6,0.99,86,20.99,2006-02-15 05:03:42
List(6459820e5abd1ab718099e1b),2,ACE GOLDFINGER,3,4.99,48,12.99,2006-02-15 05:03:42
List(6459820e5abd1ab718099e1c),3,ADAPTATION HOLES,7,2.99,50,18.99,2006-02-15 05:03:42
List(6459820e5abd1ab718099e1d),4,AFFAIR PREJUDICE,5,2.99,117,26.99,2006-02-15 05:03:42
List(6459820e5abd1ab718099e1e),5,AFRICAN EGG,6,2.99,130,22.99,2006-02-15 05:03:42
List(6459820e5abd1ab718099e1f),6,AGENT TRUMAN,3,2.99,169,17.99,2006-02-15 05:03:42
List(6459820e5abd1ab718099e20),7,AIRPLANE SIERRA,6,4.99,62,28.99,2006-02-15 05:03:42
List(6459820e5abd1ab718099e21),8,AIRPORT POLLOCK,6,4.99,54,15.99,2006-02-15 05:03:42
List(6459820e5abd1ab718099e22),9,ALABAMA DEVIL,3,2.99,114,21.99,2006-02-15 05:03:42
List(6459820e5abd1ab718099e23),10,ALADDIN CALENDAR,6,4.99,63,24.99,2006-02-15 05:03:42


In [0]:
%scala
df_film.printSchema()

##### 2.5.2. Use the Spark DataFrame to Create a New Invoices Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_film.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_film")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film

col_name,data_type,comment
_id,struct,
film_id,int,
title,string,
rental_duration,int,
rental_rate,double,
length,int,
replacement_cost,double,
last_update,string,
,,
# Detailed Table Information,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5

_id,film_id,title,rental_duration,rental_rate,length,replacement_cost,last_update
List(6459820e5abd1ab718099e1a),1,ACADEMY DINOSAUR,6,0.99,86,20.99,2006-02-15 05:03:42
List(6459820e5abd1ab718099e1b),2,ACE GOLDFINGER,3,4.99,48,12.99,2006-02-15 05:03:42
List(6459820e5abd1ab718099e1c),3,ADAPTATION HOLES,7,2.99,50,18.99,2006-02-15 05:03:42
List(6459820e5abd1ab718099e1d),4,AFFAIR PREJUDICE,5,2.99,117,26.99,2006-02-15 05:03:42
List(6459820e5abd1ab718099e1e),5,AFRICAN EGG,6,2.99,130,22.99,2006-02-15 05:03:42


#####DONT NEED TO DO
#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
employee_csv = f"{batch_dir}/sakila_DimEmployees.csv"

df_employee = spark.read.format('csv').options(header='true', inferSchema='true').load(employee_csv)
display(df_employee)

employee_key,company,last_name,first_name,email_address,job_title,business_phone,home_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page
1,Northwind Traders,Freehafer,Nancy,nancy@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 1st Avenue,Seattle,WA,99999,USA,#http://northwindtraders.com#
2,Northwind Traders,Cencini,Andrew,andrew@northwindtraders.com,"Vice President, Sales",(123)555-0100,(123)555-0102,(123)555-0103,123 2nd Avenue,Bellevue,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
3,Northwind Traders,Kotas,Jan,jan@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 3rd Avenue,Redmond,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
4,Northwind Traders,Sergienko,Mariya,mariya@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 4th Avenue,Kirkland,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
5,Northwind Traders,Thorpe,Steven,steven@northwindtraders.com,Sales Manager,(123)555-0100,(123)555-0102,(123)555-0103,123 5th Avenue,Seattle,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
6,Northwind Traders,Neipper,Michael,michael@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 6th Avenue,Redmond,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
7,Northwind Traders,Zare,Robert,robert@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 7th Avenue,Seattle,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
8,Northwind Traders,Giussani,Laura,laura@northwindtraders.com,Sales Coordinator,(123)555-0100,(123)555-0102,(123)555-0103,123 8th Avenue,Redmond,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
9,Northwind Traders,Hellung-Larsen,Anne,anne@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 9th Avenue,Seattle,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#


In [0]:
df_employee.printSchema()

root
 |-- employee_key: integer (nullable = true)
 |-- company: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- email_address: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- business_phone: string (nullable = true)
 |-- home_phone: string (nullable = true)
 |-- fax_number: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_province: string (nullable = true)
 |-- zip_postal_code: integer (nullable = true)
 |-- country_region: string (nullable = true)
 |-- web_page: string (nullable = true)



In [0]:
df_employee.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_employee")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
customer_id,int,
store_id,int,
first_name,string,
last_name,string,
email,string,
address_id,int,
active,boolean,
create_date,timestamp,
last_update,timestamp,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_employee LIMIT 5;

employee_key,company,last_name,first_name,email_address,job_title,business_phone,home_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page
1,Northwind Traders,Freehafer,Nancy,nancy@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 1st Avenue,Seattle,WA,99999,USA,#http://northwindtraders.com#
2,Northwind Traders,Cencini,Andrew,andrew@northwindtraders.com,"Vice President, Sales",(123)555-0100,(123)555-0102,(123)555-0103,123 2nd Avenue,Bellevue,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
3,Northwind Traders,Kotas,Jan,jan@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 3rd Avenue,Redmond,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
4,Northwind Traders,Sergienko,Mariya,mariya@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 4th Avenue,Kirkland,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
5,Northwind Traders,Thorpe,Steven,steven@northwindtraders.com,Sales Manager,(123)555-0100,(123)555-0102,(123)555-0103,123 5th Avenue,Seattle,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#


##### 3.2 Use PySpark to Read Shipper Dimension Data from CSV File

In [0]:
shipper_csv = f"{batch_dir}/sakila_DimShippers.csv"

df_shipper = spark.read.format('csv').options(header='true', inferSchema='true').load(shipper_csv)
display(df_shipper)

shipper_key,company,address,city,state_province,zip_postal_code,country_region
1,Shipping Company A,123 Any Street,Memphis,TN,99999,USA
2,Shipping Company B,123 Any Street,Memphis,TN,99999,USA
3,Shipping Company C,123 Any Street,Memphis,TN,99999,USA


In [0]:
df_shipper.printSchema()

root
 |-- shipper_key: integer (nullable = true)
 |-- company: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_province: string (nullable = true)
 |-- zip_postal_code: integer (nullable = true)
 |-- country_region: string (nullable = true)



In [0]:
df_shipper.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_shipper")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_shipper;

col_name,data_type,comment
shipper_key,int,
company,string,
address,string,
city,string,
state_province,string,
zip_postal_code,int,
country_region,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_shipper LIMIT 5;

shipper_key,company,address,city,state_province,zip_postal_code,country_region
1,Shipping Company A,123 Any Street,Memphis,TN,99999,USA
2,Shipping Company B,123 Any Street,Memphis,TN,99999,USA
3,Shipping Company C,123 Any Street,Memphis,TN,99999,USA


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_film,False
sakila_dlh,dim_inventory,False
sakila_dlh,dim_staff,False
,view_customer,True
,view_date,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) rentals Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "rental_id INT")
 .option("cloudFiles.schemaHints", "rental_date DATETIME")
 .option("cloudFiles.schemaHints", "inventory_id MEDIUMINT")
 .option("cloudFiles.schemaHints", "customer_id SMALLINT") 
 .option("cloudFiles.schemaHints", "return_date DATETIME")
 .option("cloudFiles.schemaHints", "staff_id TINYINT")
 .option("cloudFiles.schemaHints", "last_update TIMESTAMP")
 .option("cloudFiles.schemaHints", "film_id SMALLINT")
 .option("cloudFiles.schemaHints", "store_id SMALLINT") 
 .option("cloudFiles.schemaHints", "rental_rate DECIMAL")
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rentals_stream_dir)
 .createOrReplaceTempView("rentals_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

customer_id,film_id,inventory_id,last_update,rental_date,rental_date_key,rental_id,rental_rate,return_date,return_date_key,staff_id,store_id,_rescued_data,receipt_time,source_file
130,80,367,2023-03-22 16:08:25,2005-05-24 22:53:30,20050524,1,3,2005-05-26 22:04:30,0,1,1,,2023-05-09T14:01:48.651+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
459,333,1525,2023-03-22 16:08:25,2005-05-24 22:54:33,20050524,2,3,2005-05-28 19:40:33,0,1,2,,2023-05-09T14:01:48.651+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
408,373,1711,2023-03-22 16:08:25,2005-05-24 23:03:39,20050524,3,3,2005-06-01 22:12:39,0,1,2,,2023-05-09T14:01:48.651+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
333,535,2452,2023-03-22 16:08:25,2005-05-24 23:04:41,20050524,4,1,2005-06-03 01:43:41,0,2,1,,2023-05-09T14:01:48.651+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
222,450,2079,2023-03-22 16:08:25,2005-05-24 23:05:21,20050524,5,3,2005-06-02 04:33:21,0,1,2,,2023-05-09T14:01:48.651+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
549,613,2792,2023-03-22 16:08:25,2005-05-24 23:08:07,20050524,6,1,2005-05-27 01:32:07,0,1,1,,2023-05-09T14:01:48.651+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
269,870,3995,2023-03-22 16:08:25,2005-05-24 23:11:53,20050524,7,1,2005-05-29 20:34:53,0,2,2,,2023-05-09T14:01:48.651+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
239,510,2346,2023-03-22 16:08:25,2005-05-24 23:31:46,20050524,8,5,2005-05-27 23:33:46,0,2,1,,2023-05-09T14:01:48.651+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
126,565,2580,2023-03-22 16:08:25,2005-05-25 00:00:40,20050525,9,5,2005-05-28 00:22:40,0,1,1,,2023-05-09T14:01:48.651+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
399,396,1824,2023-03-22 16:08:25,2005-05-25 00:02:21,20050525,10,5,2005-05-31 22:44:21,0,2,2,,2023-05-09T14:01:48.651+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

Out[27]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fed20b1d190>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

customer_id,film_id,inventory_id,last_update,rental_date,rental_date_key,rental_id,rental_rate,return_date,return_date_key,staff_id,store_id,_rescued_data,receipt_time,source_file
130,80,367,2023-03-22 16:08:25,2005-05-24 22:53:30,20050524,1,3,2005-05-26 22:04:30,0,1,1,,2023-05-09T14:02:35.596+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
459,333,1525,2023-03-22 16:08:25,2005-05-24 22:54:33,20050524,2,3,2005-05-28 19:40:33,0,1,2,,2023-05-09T14:02:35.596+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
408,373,1711,2023-03-22 16:08:25,2005-05-24 23:03:39,20050524,3,3,2005-06-01 22:12:39,0,1,2,,2023-05-09T14:02:35.596+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
333,535,2452,2023-03-22 16:08:25,2005-05-24 23:04:41,20050524,4,1,2005-06-03 01:43:41,0,2,1,,2023-05-09T14:02:35.596+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
222,450,2079,2023-03-22 16:08:25,2005-05-24 23:05:21,20050524,5,3,2005-06-02 04:33:21,0,1,2,,2023-05-09T14:02:35.596+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
549,613,2792,2023-03-22 16:08:25,2005-05-24 23:08:07,20050524,6,1,2005-05-27 01:32:07,0,1,1,,2023-05-09T14:02:35.596+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
269,870,3995,2023-03-22 16:08:25,2005-05-24 23:11:53,20050524,7,1,2005-05-29 20:34:53,0,2,2,,2023-05-09T14:02:35.596+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
239,510,2346,2023-03-22 16:08:25,2005-05-24 23:31:46,20050524,8,5,2005-05-27 23:33:46,0,2,1,,2023-05-09T14:02:35.596+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
126,565,2580,2023-03-22 16:08:25,2005-05-25 00:00:40,20050525,9,5,2005-05-28 00:22:40,0,1,1,,2023-05-09T14:02:35.596+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json
399,396,1824,2023-03-22 16:08:25,2005-05-25 00:02:21,20050525,10,5,2005-05-31 22:44:21,0,2,2,,2023-05-09T14:02:35.596+0000,dbfs:/FileStore/source_data/stream/rentals/rental_fact_v2.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
customer_id,bigint,
film_id,bigint,
inventory_id,bigint,
last_update,string,
rental_date,string,
rental_date_key,bigint,
rental_id,bigint,
rental_rate,"decimal(10,0)",
return_date,string,
return_date_key,bigint,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
SELECT r.rental_id,
r.rental_date,
r.inventory_id,
r.customer_id,
r.return_date,
r.staff_id,
r.last_update,
r.film_id,
r.store_id,
r.rental_rate
FROM rentals_silver_tempview as r
INNER JOIN sakila_dlh.dim_inventory as i
ON r.inventory_id = i.inventory_id
INNER JOIN sakila_dlh.dim_film as f
ON r.film_id = f.film_id
INNER JOIN sakila_dlh.dim_staff as s
ON r.staff_id = s.staff_id
)


In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

Out[39]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fed191bc250>

In [0]:
%sql
SELECT * FROM fact_rentals_silver

rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update,film_id,store_id,rental_rate


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rentals_silver

col_name,data_type,comment
rental_id,bigint,
rental_date,string,
inventory_id,bigint,
customer_id,bigint,
return_date,string,
staff_id,bigint,
last_update,string,
film_id,bigint,
store_id,bigint,
rental_rate,"decimal(10,0)",


##### 6.3. Gold Table: Perform Aggregations

In [0]:
%sql
SELECT customer_id AS CustomerID
  , COUNT(rental_id) AS ProductCount
FROM sakila_dlh.fact_rentals_silver
GROUP BY CustomerID
ORDER BY ProductCount DESC

#### 9.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/ds2002-Lab6/