## Final_Project: Data Lakehouse with Structured Streaming_documentation
Rita Wu    nzw2tzt
### Overview
This file is a databricks experiment that using the ETL process to deal with Sakila database. I will show how to read in files from different sources and transform it into a fact_order database that combines all needed information.

- The method includes: read files from Mysql Azure, mongoDB, JSON, CSV; using data intergration patterns to clean the data and select the most valuable dataset that are meaningful to the users.

#### Dimension tables
  - Dim_date: an additional date database showing the time/day/week the rental deals happens
  - Dim_rental: all the information related to the film theater rental.
  - Dim_store: where the rental deals take place
  - Dim_staff: who process the rental deals
  - Dim_inventory: transaction database
  - Dim_customer: the data of customers who go to the theater to watch the films
  - Dim_film: the films showing in each film theater 
#### Procedure
- pre step
  = re-run sakila data in new Mysql Azure database
  - re-run midterm code "my sample sql" to produce dim store, dim staff, dim customer, dim inventory and export them to JSON files
  - re-run midterm code.ipynb to create fact_orders table
- Cold path data
  - read in date dimension from Mysql Azure
  - read in rental dimension from Mysql Azure
  - read in staff dimension from MongoDB (JSON)
  - read in store dimension from MongoDB (JSON)
  - read in inventory dimension from CSV file
  - read in film dimension from CSV file
  - read in customer dimension from CSV file
   * fact_orders table already merged rental dimension with staff, store, and inventory
- Hot path data
  - proceed fact_orders data using databricks spark
    - load in raw data and create bronze table
    - include reference data and merge them in silver table
      - join "rental data, store data, date data" with the fact_orders table
    - selected need data in gold table
- Data visualization
  - I choose to visualize the relationship between average rental duration and the weekdays the deal is on, and group it by the staff selling the theater. It aims to show which day is the luck day for film rental (that has the longest rental duration, which usually means more revenue.) And which staff has a better selling performance.
  - the result is surprising: the deals on Tuesdays has significantly higer rental duration, and Hillyer has a better selling performance on most of the days.

####### Support theories
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "nzw2zt-sql-ds.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila"

connection_properties = {
  "user" : "nzw2zt",
  "password" : "Wsr2022winwin@",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "Cluster1.z1qvver"
atlas_database_name = "sakila"
atlas_user_name = "rita"
atlas_password = "Passw0rd123"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

fact_table_dir = f"{stream_dir}/sakila_fact_order"
#purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
#purchase_orders_stream_dir = f/dbfs/FileStore/source_data/stream/purchase_orders"
inventory_trans_stream_dir = f"{stream_dir}/sakila_inventory"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"

inventory_trans_output_bronze = f"{database_dir}/fact_inventory_transactions/bronze"
inventory_trans_output_silver = f"{database_dir}/fact_inventory_transactions/silver"
inventory_trans_output_gold   = f"{database_dir}/fact_inventory_transactions/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[2]: True

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 final Database"
LOCATION "dbfs:/FileStore/ds2002-lab06/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 final");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
 url "jdbc:mysql://nzw2zt-sql-ds.mysql.database.azure.com:3306/sakila_dw1",
  dbtable "dim_date",
  user "nzw2zt",
  password "Wsr2022winwin@"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-lab06/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Rental Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_rental" that extracts data from your MySQL sakila_dw1 database.
CREATE OR REPLACE TEMPORARY VIEW view_rental
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://nzw2zt-sql-ds.mysql.database.azure.com:3306/sakila_dw1",
  dbtable "dim_rental",
  user "nzw2zt",
  password "Wsr2022winwin@"
)

In [0]:
%sql
-- Create a new table named "sakila_dlh.dim_rental" using data from the view named "view_rental"
USE DATABASE sakila_dlh;
CREATE OR REPLACE TABLE sakila_dlh.dim_rental
COMMENT "Product Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-lab06/sakila_dlh/dim_rental"
AS SELECT * FROM view_rental

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_rental;

col_name,data_type,comment
rental_key,int,
rental_date,timestamp,
inventory_id,bigint,
customer_id,int,
return_date,timestamp,
staff_id,int,
last_update,timestamp,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_rental LIMIT 5

rental_key,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
1,2005-05-24T22:53:30.000+0000,367,130,2005-05-26T22:04:30.000+0000,1,2006-02-15T21:30:53.000+0000
2,2005-05-24T22:54:33.000+0000,1525,459,2005-05-28T19:40:33.000+0000,1,2006-02-15T21:30:53.000+0000
3,2005-05-24T23:03:39.000+0000,1711,408,2005-06-01T22:12:39.000+0000,1,2006-02-15T21:30:53.000+0000
4,2005-05-24T23:04:41.000+0000,2452,333,2005-06-03T01:43:41.000+0000,2,2006-02-15T21:30:53.000+0000
5,2005-05-24T23:05:21.000+0000,2079,222,2005-06-02T04:33:21.000+0000,1,2006-02-15T21:30:53.000+0000


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/source_data/batch/Northwind_DimCustomers.json,Northwind_DimCustomers.json,10476,1681831324000
dbfs:/FileStore/source_data/batch/Northwind_DimEmployees.csv,Northwind_DimEmployees.csv,2164,1681831324000
dbfs:/FileStore/source_data/batch/Northwind_DimInvoices.json,Northwind_DimInvoices.json,6263,1681831324000
dbfs:/FileStore/source_data/batch/Northwind_DimShippers.csv,Northwind_DimShippers.csv,262,1681831324000
dbfs:/FileStore/source_data/batch/Northwind_DimSuppliers.json,Northwind_DimSuppliers.json,1480,1681831324000
dbfs:/FileStore/source_data/batch/dim_customer.csv,dim_customer.csv,61426,1682447894000
dbfs:/FileStore/source_data/batch/dim_film.csv,dim_film.csv,43092,1682447590000
dbfs:/FileStore/source_data/batch/dim_inventory.csv,dim_inventory.csv,9428,1682447335000
dbfs:/FileStore/source_data/batch/dim_staff.json,dim_staff.json,600,1682444909000
dbfs:/FileStore/source_data/batch/dim_store.json,dim_store.json,227,1682444847000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/source_data/batch'
json_files = {"staff" : 'dim_staff.json', "store" : 'dim_store.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[17]: <pymongo.results.InsertManyResult at 0x7f701844cb40>

##### 2.3.1. Fetch Staff Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_staff = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila").option("collection", "staff").load()
.select("staff_key", "first_name", "last_name", "address_id", "email","store_id", "active" ,"username","password","last_update")

display(df_staff)

staff_key,first_name,last_name,address_id,email,store_id,active,username,password,last_update
1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


In [0]:
%scala
df_staff.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Staff Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff

col_name,data_type,comment
staff_key,int,
first_name,string,
last_name,string,
address_id,int,
email,string,
store_id,int,
active,int,
username,string,
password,string,
last_update,string,


In [0]:
#%sql
#SELECT * FROM sakila_dlh.dim_staff LIMIT 5 

##### 2.4.1 Fetch Store Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_store= spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila").option("collection", "store").load()
.select("store_key","manager_staff_id","address_id","last_update")

display(df_store)

store_key,manager_staff_id,address_id,last_update
1,1,1,2006-02-15 04:57:12
2,2,2,2006-02-15 04:57:12


In [0]:
%scala
df_store.printSchema()

##### 2.4.2. Use the Spark DataFrame to Create a New Store Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_store.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("sakila_dlh.dim_store")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_store

col_name,data_type,comment
store_key,int,
manager_staff_id,int,
address_id,int,
last_update,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_store,
Type,MANAGED,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_store LIMIT 5

store_key,manager_staff_id,address_id,last_update
1,1,1,2006-02-15 04:57:12
2,2,2,2006-02-15 04:57:12


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read inventory dimention data From a CSV File

In [0]:
inventory_csv = f"/FileStore/source_data/batch/dim_inventory.csv"

df_inventory = spark.read.format('csv').options(header='true', inferSchema='true').load(inventory_csv)
display(df_inventory)

inventory_id,film_id,store_id
1,1,1
2,1,1
3,1,1
4,1,1
5,1,2
6,1,2
7,1,2
8,1,2
9,2,2
10,2,2


In [0]:
df_inventory.printSchema()

root
 |-- inventory_id: integer (nullable = true)
 |-- film_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)



In [0]:
df_inventory.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_inventory")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory;

col_name,data_type,comment
inventory_id,int,
film_id,int,
store_id,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_inventory,
Type,MANAGED,
Location,dbfs:/FileStore/ds2002-lab06/sakila_dlh/dim_inventory,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5;

inventory_id,film_id,store_id
1,1,1
2,1,1
3,1,1
4,1,1
5,1,2


##### 3.2 Use PySpark to Read film Dimension Data from CSV File

In [0]:
#shipper_csv = f"{batch_dir}/Northwind_DimShippers.csv"
film_csv = f"dbfs:/FileStore/source_data/batch/dim_film.csv"
df_film = spark.read.format('csv').options(header='true', inferSchema='true').load(film_csv)
display(df_film)


film_id,title,release_year,rental_duration,rental_rate,replacement_cost,rating
1,ACADEMY DINOSAUR,2006,6,0.99,20.99,PG
2,ACE GOLDFINGER,2006,3,4.99,12.99,G
3,ADAPTATION HOLES,2006,7,2.99,18.99,NC-17
4,AFFAIR PREJUDICE,2006,5,2.99,26.99,G
5,AFRICAN EGG,2006,6,2.99,22.99,G
6,AGENT TRUMAN,2006,3,2.99,17.99,PG
7,AIRPLANE SIERRA,2006,6,4.99,28.99,PG-13
8,AIRPORT POLLOCK,2006,6,4.99,15.99,R
9,ALABAMA DEVIL,2006,3,2.99,21.99,PG-13
10,ALADDIN CALENDAR,2006,6,4.99,24.99,NC-17


In [0]:
df_film.printSchema()

root
 |-- film_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- rental_duration: integer (nullable = true)
 |-- rental_rate: double (nullable = true)
 |-- replacement_cost: double (nullable = true)
 |-- rating: string (nullable = true)



In [0]:
df_film.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_film")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film;

col_name,data_type,comment
film_id,int,
title,string,
release_year,int,
rental_duration,int,
rental_rate,double,
replacement_cost,double,
rating,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5;

film_id,title,release_year,rental_duration,rental_rate,replacement_cost,rating
1,ACADEMY DINOSAUR,2006,6,0.99,20.99,PG
2,ACE GOLDFINGER,2006,3,4.99,12.99,G
3,ADAPTATION HOLES,2006,7,2.99,18.99,NC-17
4,AFFAIR PREJUDICE,2006,5,2.99,26.99,G
5,AFRICAN EGG,2006,6,2.99,22.99,G


##### 3.3 Use PySpark to Read customer Dimension Data from CSV File

In [0]:
customer_csv = f"dbfs:/FileStore/source_data/batch/dim_customer.csv"
df_customer = spark.read.format('csv').options(header='true', inferSchema='true').load(customer_csv)
display(df_customer)

customer_key,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,10,1,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,11,1,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,12,1,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,13,1,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
10,1,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,14,1,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000


In [0]:
df_customer.printSchema()

root
 |-- customer_key: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- active: integer (nullable = true)
 |-- create_date: timestamp (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [0]:
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
customer_key,int,
store_id,int,
first_name,string,
last_name,string,
email,string,
address_id,int,
active,int,
create_date,timestamp,
last_update,timestamp,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5;

customer_key,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_film,False
sakila_dlh,dim_inventory,False
sakila_dlh,dim_rental,False
sakila_dlh,dim_staff,False
sakila_dlh,dim_store,False
,view_date,True
,view_rental,True


### Section III: Integrate Reference Data with Real-Time Data
#### 4.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 4.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_key BIGINT")
 .option("cloudFiles.schemaHints", "rental_id double")
 .option("cloudFiles.schemaHints", "inventory_id double")
 .option("cloudFiles.schemaHints", "film_id bigint") 
 .option("cloudFiles.schemaHints", "staff_id double")
 .option("cloudFiles.schemaHints", "store_id double")
 .option("cloudFiles.schemaHints", "customer_id double")
 .option("cloudFiles.schemaHints", "rental_duration bigint")
 .option("cloudFiles.schemaHints", "first_name_staff text") 
 .option("cloudFiles.schemaHints", "last_name_staff text")
 .option("cloudFiles.schemaHints", "first_name_customer text")
 .option("cloudFiles.schemaHints", "last_name_customer text")
 .option("cloudFiles.schemaHints", "title text")
 .option("cloudFiles.schemaHints", "rating text")
 .option("cloudFiles.schemaHints", "tax_rate DECIMAL")
 .option("cloudFiles.schemaHints", "release_year bigint")
 .option("cloudFiles.schemaHints", "order_date_key double")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(fact_table_dir)
 .createOrReplaceTempView("fact_table_raw_tempview"))

 

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW fact_table_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM fact_table_raw_tempview
)

In [0]:
%sql
SELECT * FROM fact_table_bronze_tempview

customer_id,fact_key,film_id,first_name_customer,first_name_staff,inventory_id,last_name_customer,last_name_staff,order_date_key,rating,release_year,rental_duration,rental_id,staff_id,store_id,title,_rescued_data,receipt_time,source_file
343,101,82,DOUGLAS,Mike,373,GRAF,Hillyer,20050525.0,G,2006,3,102,1,1,BLOOD ARGONAUTS,,2023-04-27T15:29:03.058+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
384,102,735,ERNEST,Jon,3343,STEPP,Stephens,20050525.0,PG-13,2006,7,103,1,2,ROBBERS JOON,,2023-04-27T15:29:03.058+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
310,103,932,DANIEL,Jon,4281,CABRAL,Stephens,20050525.0,G,2006,3,104,1,2,VALLEY PACKER,,2023-04-27T15:29:03.058+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
108,104,173,TRACY,Mike,794,COLE,Hillyer,20050525.0,PG-13,2006,7,105,2,1,CONFESSIONS MAGUIRE,,2023-04-27T15:29:03.058+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
196,105,791,ALMA,Mike,3627,AUSTIN,Hillyer,20050525.0,PG-13,2006,3,106,2,1,SHOW LORD,,2023-04-27T15:29:03.058+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
317,106,621,EDWARD,Jon,2833,BAUGH,Stephens,20050525.0,PG-13,2006,5,107,2,2,NETWORK PEAK,,2023-04-27T15:29:03.058+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
242,107,724,GLENDA,Mike,3289,FRAZIER,Hillyer,20050525.0,R,2006,5,108,1,1,REMEMBER DIARY,,2023-04-27T15:29:03.058+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
503,108,233,ANGEL,Mike,1044,BARCLAY,Hillyer,20050525.0,PG,2006,3,109,2,1,DISCIPLE MOTHER,,2023-04-27T15:29:03.058+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
19,109,893,RUTH,Mike,4108,MARTINEZ,Hillyer,20050525.0,PG,2006,4,110,2,1,TITANS JERK,,2023-04-27T15:29:03.058+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
227,110,814,COLLEEN,Mike,3725,BURTON,Hillyer,20050525.0,PG,2006,6,111,1,1,SNATCH SLIPPER,,2023-04-27T15:29:03.058+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json


In [0]:
(spark.table("fact_table_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

Out[41]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f7018235af0>

##### 4.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

customer_id,fact_key,film_id,first_name_customer,first_name_staff,inventory_id,last_name_customer,last_name_staff,order_date_key,rating,release_year,rental_duration,rental_id,staff_id,store_id,title,_rescued_data,receipt_time,source_file
343,101,82,DOUGLAS,Mike,373,GRAF,Hillyer,20050525.0,G,2006,3,102,1,1,BLOOD ARGONAUTS,,2023-04-27T15:29:07.394+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
384,102,735,ERNEST,Jon,3343,STEPP,Stephens,20050525.0,PG-13,2006,7,103,1,2,ROBBERS JOON,,2023-04-27T15:29:07.394+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
310,103,932,DANIEL,Jon,4281,CABRAL,Stephens,20050525.0,G,2006,3,104,1,2,VALLEY PACKER,,2023-04-27T15:29:07.394+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
108,104,173,TRACY,Mike,794,COLE,Hillyer,20050525.0,PG-13,2006,7,105,2,1,CONFESSIONS MAGUIRE,,2023-04-27T15:29:07.394+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
196,105,791,ALMA,Mike,3627,AUSTIN,Hillyer,20050525.0,PG-13,2006,3,106,2,1,SHOW LORD,,2023-04-27T15:29:07.394+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
317,106,621,EDWARD,Jon,2833,BAUGH,Stephens,20050525.0,PG-13,2006,5,107,2,2,NETWORK PEAK,,2023-04-27T15:29:07.394+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
242,107,724,GLENDA,Mike,3289,FRAZIER,Hillyer,20050525.0,R,2006,5,108,1,1,REMEMBER DIARY,,2023-04-27T15:29:07.394+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
503,108,233,ANGEL,Mike,1044,BARCLAY,Hillyer,20050525.0,PG,2006,3,109,2,1,DISCIPLE MOTHER,,2023-04-27T15:29:07.394+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
19,109,893,RUTH,Mike,4108,MARTINEZ,Hillyer,20050525.0,PG,2006,4,110,2,1,TITANS JERK,,2023-04-27T15:29:07.394+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json
227,110,814,COLLEEN,Mike,3725,BURTON,Hillyer,20050525.0,PG,2006,6,111,1,1,SNATCH SLIPPER,,2023-04-27T15:29:07.394+0000,dbfs:/FileStore/source_data/stream/sakila_fact_order/fact_table02.json


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
customer_id,bigint,
fact_key,bigint,
film_id,bigint,
first_name_customer,string,
first_name_staff,string,
inventory_id,bigint,
last_name_customer,string,
last_name_staff,string,
order_date_key,double,
rating,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.`fact_key`,
    o.`rental_id`,
    o.`inventory_id`,
    o.`film_id`,
    o.`staff_id`,
    o.`store_id`,
    o.`customer_id`,
    o.`rental_duration`,
    o.`first_name_staff`,
    o.`last_name_staff`,
    o.`first_name_customer`,
    o.`last_name_customer`,
    o.`title`,
    o.`rating`,
    o.`release_year`,
    o.`order_date_key`,
    r.`rental_key`,
    r.`rental_date`,
    r.`return_date`,
    store.`store_key`,
    store.`manager_staff_id`,
    store.`address_id`,
    od.day_name_of_week AS order_day_name_of_week,
    od.day_of_month AS order_day_of_month,
    od.weekday_weekend AS order_weekday_weekend,
    od.month_name AS order_month_name,
    od.calendar_quarter AS order_quarter,
    od.calendar_year AS order_year
  FROM orders_silver_tempview AS o
  INNER JOIN sakila_dlh.dim_rental AS r
  ON r.rental_key = o.rental_id
  INNER JOIN sakila_dlh.dim_store AS store
  ON store.store_key = o.store_id
  LEFT OUTER JOIN sakila_dlh.dim_date AS od
  ON od.date_key = o.order_date_key
)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

Out[46]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f70184789a0>

In [0]:
%sql
SELECT * FROM fact_orders_silver

fact_key,rental_id,inventory_id,film_id,staff_id,store_id,customer_id,rental_duration,first_name_staff,last_name_staff,first_name_customer,last_name_customer,title,rating,release_year,order_date_key,rental_key,rental_date,return_date,store_key,manager_staff_id,address_id,order_day_name_of_week,order_day_of_month,order_weekday_weekend,order_month_name,order_quarter,order_year
0,1,367,80,1,1,130,7,Mike,Hillyer,CHARLOTTE,HUNTER,BLANKET BEVERLY,G,2006,20050524.0,1,2005-05-24T22:53:30.000+0000,2005-05-26T22:04:30.000+0000,1,1,1,Tuesday,24,Weekday,May,2,2005
1,2,1525,333,1,1,459,7,Mike,Hillyer,TOMMY,COLLAZO,FREAKY POCUS,R,2006,20050524.0,2,2005-05-24T22:54:33.000+0000,2005-05-28T19:40:33.000+0000,1,1,1,Tuesday,24,Weekday,May,2,2005
2,3,1711,373,1,1,408,7,Mike,Hillyer,MANUEL,MURRELL,GRADUATE LORD,G,2006,20050524.0,3,2005-05-24T23:03:39.000+0000,2005-06-01T22:12:39.000+0000,1,1,1,Tuesday,24,Weekday,May,2,2005
3,4,2452,535,2,2,333,6,Jon,Stephens,ANDREW,PURDY,LOVE SUICIDES,R,2006,20050524.0,4,2005-05-24T23:04:41.000+0000,2005-06-03T01:43:41.000+0000,2,2,2,Tuesday,24,Weekday,May,2,2005
4,5,2079,450,1,2,222,5,Jon,Stephens,DELORES,HANSEN,IDOLS SNATCHERS,NC-17,2006,20050524.0,5,2005-05-24T23:05:21.000+0000,2005-06-02T04:33:21.000+0000,2,2,2,Tuesday,24,Weekday,May,2,2005
5,6,2792,613,1,1,549,5,Mike,Hillyer,NELSON,CHRISTENSON,MYSTIC TRUMAN,NC-17,2006,20050524.0,6,2005-05-24T23:08:07.000+0000,2005-05-27T01:32:07.000+0000,1,1,1,Tuesday,24,Weekday,May,2,2005
6,7,3995,870,2,1,269,4,Mike,Hillyer,CASSANDRA,WALTERS,SWARM GOLD,PG-13,2006,20050524.0,7,2005-05-24T23:11:53.000+0000,2005-05-29T20:34:53.000+0000,1,1,1,Tuesday,24,Weekday,May,2,2005
7,8,2346,510,2,2,239,6,Jon,Stephens,MINNIE,ROMERO,LAWLESS VISION,G,2006,20050524.0,8,2005-05-24T23:31:46.000+0000,2005-05-27T23:33:46.000+0000,2,2,2,Tuesday,24,Weekday,May,2,2005
8,9,2580,565,1,1,126,6,Mike,Hillyer,ELLEN,SIMPSON,MATRIX SNOWMAN,PG-13,2006,20050525.0,9,2005-05-25T00:00:40.000+0000,2005-05-28T00:22:40.000+0000,1,1,1,Wednesday,25,Weekday,May,2,2005
9,10,1824,396,2,1,399,5,Mike,Hillyer,DANNY,ISOM,HANGING DEEP,G,2006,20050525.0,10,2005-05-25T00:02:21.000+0000,2005-05-31T22:44:21.000+0000,1,1,1,Wednesday,25,Weekday,May,2,2005


In [0]:
%sql
DESCRIBE EXTENDED northwind_dlh.fact_orders_silver

col_name,data_type,comment
fact_order_key,bigint,
order_key,bigint,
employee_key,bigint,
employee_last_name,string,
employee_first_name,string,
employee_job_title,string,
employee_company,string,
customer_key,bigint,
customer_last_name,string,
customer_first_name,string,


##### 4.3. Gold Table: Perform Aggregations

In [0]:
%sql
SELECT 
  order_day_name_of_week,
  avg (rental_duration),
  last_name_staff,
  rating,
  release_year
FROM sakila_dlh.fact_orders_silver
GROUP BY order_day_name_of_week, last_name_staff, rating, release_year
ORDER BY release_year DESC

order_day_name_of_week,avg(rental_duration),last_name_staff,rating,release_year
Wednesday,4.727272727272728,Stephens,PG,2006
Thursday,5.409090909090909,Hillyer,NC-17,2006
Wednesday,5.25,Hillyer,G,2006
Sunday,4.785714285714286,Hillyer,NC-17,2006
Tuesday,5.0,Stephens,G,2006
Sunday,5.291666666666667,Hillyer,PG,2006
Monday,5.434782608695652,Stephens,NC-17,2006
Thursday,5.5,Hillyer,PG-13,2006
Monday,5.375,Hillyer,NC-17,2006
Tuesday,5.5,Hillyer,PG-13,2006


Output can only be rendered in Databricks

#### 5.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/ds2002-lab06/

