## Capstone Project

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "qeu5gn-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw2"

connection_properties = {
  "user" : "qeu5gn",
  "password" : "Twixie123!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.gof1nr3"
atlas_database_name = "sakila_dw2"
atlas_user_name = "qeu5gn"
atlas_password = "twixie123"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

fact_rental_stream_dir = f"{stream_dir}/fact_rental"
# purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
# inventory_trans_stream_dir = f"{stream_dir}/inventory_transactions"

rental_output_bronze = f"{database_dir}/fact_rental/bronze"
rental_output_silver = f"{database_dir}/fact_rental/silver"
rental_output_gold   = f"{database_dir}/fact_rental/gold"

# purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
# purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
# purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"

# inventory_trans_output_bronze = f"{database_dir}/fact_inventory_transactions/bronze"
# inventory_trans_output_silver = f"{database_dir}/fact_inventory_transactions/silver"
# inventory_trans_output_gold   = f"{database_dir}/fact_inventory_transactions/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rental", True) 

# dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
# dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[131]: True

#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Capstone Database"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://qeu5gn-mysql.mysql.database.azure.com:3306/northwind_dw", --Replace with your Server Name
  dbtable "dim_date",
  user "qeu5gn",    --Replace with your User Name
  password "Twixie123!"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Actor Dimension Data from an Azure MySQL database.

In [0]:
%sql

CREATE OR REPLACE TEMPORARY VIEW view_payment
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://qeu5gn-mysql.mysql.database.azure.com:3306/sakila_dw2", --Replace with your Server Name
  dbtable "dim_payment",
  user "qeu5gn",    --Replace with your User Name
  password "Twixie123!"  --Replace with you password
)

-- Create a Temporary View named "view_payment" that extracts data from your MySQL Sakila database.

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_payment
COMMENT "Payment Table"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh/dim_payment"
AS SELECT * FROM view_payment

-- Create a new table named "sakilia_dlh.dim_payment" using data from the view named "view_payment"

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_payment;

col_name,data_type,comment
payment_id,int,
customer_id,int,
staff_id,int,
rental_id,int,
amount,"decimal(5,2)",
payment_date,timestamp,
last_update,timestamp,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_payment LIMIT 5

payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
1,1,1,76,2.99,2005-05-25T11:30:37.000+0000,2006-02-15T22:12:30.000+0000
2,1,1,573,0.99,2005-05-28T10:35:23.000+0000,2006-02-15T22:12:30.000+0000
3,1,1,1185,5.99,2005-06-15T00:54:12.000+0000,2006-02-15T22:12:30.000+0000
4,1,2,1422,0.99,2005-06-15T18:02:53.000+0000,2006-02-15T22:12:30.000+0000
5,1,2,1476,9.99,2005-06-15T21:08:46.000+0000,2006-02-15T22:12:30.000+0000


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/lab_data/retail/batch'

# add files to batch directory --> DONE

path,name,size,modificationTime
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimCustomers-1.json,Northwind_DimCustomers-1.json,10884,1701203536000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimCustomers.json,Northwind_DimCustomers.json,10884,1701203516000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimEmployees-1.csv,Northwind_DimEmployees-1.csv,2174,1701203536000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimEmployees.csv,Northwind_DimEmployees.csv,2174,1701203515000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimInvoices-1.json,Northwind_DimInvoices-1.json,6580,1701203536000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimInvoices.json,Northwind_DimInvoices.json,6580,1701203515000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimShippers-1.csv,Northwind_DimShippers-1.csv,266,1701203536000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimShippers.csv,Northwind_DimShippers.csv,266,1701203515000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimSuppliers-1.json,Northwind_DimSuppliers-1.json,1552,1701203536000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimSuppliers.json,Northwind_DimSuppliers.json,1552,1701203515000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/lab_data/retail/batch'
json_files = {"customer" : 'sakila_dim_customer.json'
              , "inventory" : 'sakila_dim_inventory.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[159]: <pymongo.results.InsertManyResult at 0x7f6a54954300>

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

// mongodb username and password

val userName = "qeu5gn"
val pwd = "twixie123"
val clusterName = "cluster0.gof1nr3"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_dw2")
.option("collection", "customer").load()
.select("customer_id","store_id","first_name","last_name","email","address_id","active", "create_date", "last_update")

display(df_customer)

customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20
6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,10,1,2006-02-14 22:04:36,2006-02-15 04:57:20
7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,11,1,2006-02-14 22:04:36,2006-02-15 04:57:20
8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,12,1,2006-02-14 22:04:36,2006-02-15 04:57:20
9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,13,1,2006-02-14 22:04:36,2006-02-15 04:57:20
10,1,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,14,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [0]:
%scala
df_customer.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer

col_name,data_type,comment
customer_id,int,
store_id,int,
first_name,string,
last_name,string,
email,string,
address_id,int,
active,int,
create_date,string,
last_update,string,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20


##### 2.4.1 Fetch Inventory Dimension Data from the New MongoDB Collection




In [0]:
%scala

import com.mongodb.spark._

// mongodb username and password

val userName = "qeu5gn"
val pwd = "twixie123"
val clusterName = "cluster0.gof1nr3"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"


val df_inventory = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_dw2")
.option("collection", "inventory").load()
.select("inventory_id","film_id","store_id", "last_update")

display(df_inventory)

inventory_id,film_id,store_id,last_update
1,1,1,2006-02-15 05:09:17
2,1,1,2006-02-15 05:09:17
3,1,1,2006-02-15 05:09:17
4,1,1,2006-02-15 05:09:17
5,1,2,2006-02-15 05:09:17
6,1,2,2006-02-15 05:09:17
7,1,2,2006-02-15 05:09:17
8,1,2,2006-02-15 05:09:17
9,2,2,2006-02-15 05:09:17
10,2,2,2006-02-15 05:09:17


In [0]:
%scala
df_inventory.printSchema()

##### 2.4.2. Use the Spark DataFrame to Create a New Suppliers Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_inventory.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_inventory")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory

col_name,data_type,comment
inventory_id,int,
film_id,int,
store_id,int,
last_update,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_inventory,
Created Time,Thu Dec 07 18:41:34 UTC 2023,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5

inventory_id,film_id,store_id,last_update
1,1,1,2006-02-15 05:09:17
2,1,1,2006-02-15 05:09:17
3,1,1,2006-02-15 05:09:17
4,1,1,2006-02-15 05:09:17
5,1,2,2006-02-15 05:09:17


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
inventory_csv = f"{batch_dir}/sakila_dim_staff.csv"

# use dbfs files

df_staff = spark.read.format('csv').options(header='true', inferSchema='true').load(inventory_csv)
display(df_staff)

staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
1,Mike,Hillyer,3,...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16.000+0000
2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15T03:57:16.000+0000


In [0]:
df_staff.printSchema()

root
 |-- staff_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- picture: string (nullable = true)
 |-- email: string (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- active: integer (nullable = true)
 |-- username: string (nullable = true)
 |-- password: string (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [0]:
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff;

col_name,data_type,comment
staff_id,int,
first_name,string,
last_name,string,
address_id,int,
picture,string,
email,string,
store_id,int,
active,int,
username,string,
password,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 5;

staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
1,Mike,Hillyer,3,...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16.000+0000
2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15T03:57:16.000+0000


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 #.option("cloudFiles.schemaHints", "order_key BIGINT")
 #.option("cloudFiles.schemaHints", "employee_key BIGINT")
 #.option("cloudFiles.schemaHints", "customer_key BIGINT") 
 #.option("cloudFiles.schemaHints", "product_key BIGINT")
 #.option("cloudFiles.schemaHints", "shipper_key DECIMAL")
 #.option("cloudFiles.schemaHints", "order_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "paid_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "shipped_date_key DECIMAL") 
 #.option("cloudFiles.schemaHints", "quantity DECIMAL")
 #.option("cloudFiles.schemaHints", "unit_price DECIMAL")
 #.option("cloudFiles.schemaHints", "discount DECIMAL")
 #.option("cloudFiles.schemaHints", "shipping_fee DECIMAL")
 #.option("cloudFiles.schemaHints", "taxes DECIMAL")
 #.option("cloudFiles.schemaHints", "tax_rate DECIMAL")
 #.option("cloudFiles.schemaHints", "payment_type STRING")
 #.option("cloudFiles.schemaHints", "order_status STRING")
 #.option("cloudFiles.schemaHints", "order_details_status STRING")
 .option("cloudFiles.schemaLocation", rental_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(fact_rental_stream_dir)
 .createOrReplaceTempView("rental_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rental_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rental_raw_tempview
)

In [0]:
%sql
SELECT * FROM rental_bronze_tempview

customer_id,inventory_id,last_update,rental_date_key,rental_id,return_date_key,staff_id,_rescued_data,receipt_time,source_file
437,3075,2023-12-07 17:46:27,20050528,666,20050605,2,,2023-12-07T18:49:22.796+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
596,797,2023-12-07 17:46:27,20050528,667,20050531,1,,2023-12-07T18:49:22.796+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
484,3528,2023-12-07 17:46:27,20050528,668,20050529,1,,2023-12-07T18:49:22.796+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
313,3677,2023-12-07 17:46:27,20050528,669,20050603,1,,2023-12-07T18:49:22.796+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
201,227,2023-12-07 17:46:27,20050528,670,20050606,2,,2023-12-07T18:49:22.796+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
14,1027,2023-12-07 17:46:27,20050528,671,20050603,2,,2023-12-07T18:49:22.796+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
306,697,2023-12-07 17:46:27,20050528,672,20050606,2,,2023-12-07T18:49:22.796+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
468,1769,2023-12-07 17:46:27,20050528,673,20050601,1,,2023-12-07T18:49:22.796+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
87,1150,2023-12-07 17:46:27,20050528,674,20050601,2,,2023-12-07T18:49:22.796+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
338,1273,2023-12-07 17:46:27,20050528,675,20050601,2,,2023-12-07T18:49:22.796+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json


In [0]:
(spark.table("rental_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_bronze"))

Out[173]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f6a549ea2e0>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rental_bronze")
  .createOrReplaceTempView("rental_silver_tempview"))

In [0]:
%sql
SELECT * FROM rental_silver_tempview

customer_id,inventory_id,last_update,rental_date_key,rental_id,return_date_key,staff_id,_rescued_data,receipt_time,source_file
437,3075,2023-12-07 17:46:27,20050528,666,20050605,2,,2023-12-07T18:50:38.586+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
596,797,2023-12-07 17:46:27,20050528,667,20050531,1,,2023-12-07T18:50:38.586+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
484,3528,2023-12-07 17:46:27,20050528,668,20050529,1,,2023-12-07T18:50:38.586+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
313,3677,2023-12-07 17:46:27,20050528,669,20050603,1,,2023-12-07T18:50:38.586+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
201,227,2023-12-07 17:46:27,20050528,670,20050606,2,,2023-12-07T18:50:38.586+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
14,1027,2023-12-07 17:46:27,20050528,671,20050603,2,,2023-12-07T18:50:38.586+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
306,697,2023-12-07 17:46:27,20050528,672,20050606,2,,2023-12-07T18:50:38.586+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
468,1769,2023-12-07 17:46:27,20050528,673,20050601,1,,2023-12-07T18:50:38.586+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
87,1150,2023-12-07 17:46:27,20050528,674,20050601,2,,2023-12-07T18:50:38.586+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json
338,1273,2023-12-07 17:46:27,20050528,675,20050601,2,,2023-12-07T18:50:38.586+0000,dbfs:/FileStore/lab_data/retail/stream/fact_rental/fact_rental_3.json


In [0]:
%sql
DESCRIBE EXTENDED rental_silver_tempview

col_name,data_type,comment
customer_id,bigint,
inventory_id,bigint,
last_update,string,
rental_date_key,bigint,
rental_id,bigint,
return_date_key,bigint,
staff_id,bigint,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [0]:
%sql

CREATE OR REPLACE TEMPORARY VIEW fact_rental_silver_tempview AS (
  SELECT c.customer_id AS customer_key,
      c.first_name AS customer_first_name,
      c.last_name AS customer_last_name,
      i.inventory_id AS inventory_key,
      i.film_id,
      i.store_id,
      r.rental_date_key,
      rd.day_name_of_week AS rental_day_name_of_week,
      rd.day_of_month AS rental_day_of_month,
      rd.weekday_weekend AS rental_weekday_weekend,
      rd.month_name AS rental_month_name,
      rd.calendar_quarter AS rental_quarter,
      rd.calendar_year AS rental_year,
      p.rental_id AS rental_key,
      p.payment_id AS payment_key,
      p.amount AS payment_amount,
      r.return_date_key,
      rtd.day_name_of_week AS return_day_name_of_week,
      rtd.day_of_month AS return_day_of_month,
      rtd.weekday_weekend AS return_weekday_weekend,
      rtd.month_name AS return_month_name,
      rtd.calendar_quarter AS return_quarter,
      rtd.calendar_year AS return_year,
      s.staff_id AS staff_key,
      s.first_name AS staff_first_name,
      s.last_name AS staff_last_name
  FROM rental_silver_tempview AS r
  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_id = r.customer_id
  INNER JOIN sakila_dlh.dim_inventory AS i
  ON i.inventory_id = r.inventory_id
  INNER JOIN sakila_dlh.dim_payment AS p
  ON p.rental_id = r.rental_id
  INNER JOIN sakila_dlh.dim_staff AS s 
  ON s.staff_id = r.staff_id
  INNER JOIN sakila_dlh.dim_date AS rd
  ON rd.date_key = r.rental_date_key
  INNER JOIN sakila_dlh.dim_date AS rtd
  ON rtd.date_key = r.return_date_key
)

In [0]:
(spark.table("fact_rental_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_silver"))

Out[189]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f6a549bd4f0>

In [0]:
%sql
SELECT * FROM fact_rental_silver

customer_first_name,customer_last_name,film_id,store_id,rental_date_key,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_quarter,rental_year,payment_key,payment_amount,return_date_key,return_day_name_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_quarter,return_year,staff_first_name,staff_last_name
ENRIQUE,FORSYTHE,174,1,20050528,Saturday,28,Weekend,May,2,2005,15958,4.99,20050531,Tuesday,31,Weekday,May,2,2005,Mike,Hillyer
VICKI,FIELDS,51,1,20050528,Saturday,28,Weekend,May,2,2005,5446,6.99,20050606,Monday,6,Weekday,June,2,2005,Jon,Stephens
CHARLES,KOWALSKI,152,1,20050528,Saturday,28,Weekend,May,2,2005,8295,6.99,20050606,Monday,6,Weekday,June,2,2005,Jon,Stephens
KARL,SEAL,201,1,20050528,Saturday,28,Weekend,May,2,2005,14137,4.99,20050606,Monday,6,Weekday,June,2,2005,Jon,Stephens
JOHNNIE,CHISHOLM,162,2,20050529,Sunday,29,Weekend,May,2,2005,15289,3.99,20050603,Friday,3,Weekday,June,2,2005,Jon,Stephens
CAROL,GARCIA,174,2,20050529,Sunday,29,Weekend,May,2,2005,470,4.99,20050602,Thursday,2,Weekday,June,2,2005,Jon,Stephens
YOLANDA,WEAVER,138,2,20050529,Sunday,29,Weekend,May,2,2005,5150,2.99,20050603,Friday,3,Weekday,June,2,2005,Jon,Stephens
DANNY,ISOM,129,2,20050529,Sunday,29,Weekend,May,2,2005,10786,6.99,20050605,Sunday,5,Weekend,June,2,2005,Mike,Hillyer
MARIAN,MENDOZA,26,1,20050529,Sunday,29,Weekend,May,2,2005,6657,3.99,20050604,Saturday,4,Weekend,June,2,2005,Jon,Stephens
EDITH,MCDONALD,111,2,20050529,Sunday,29,Weekend,May,2,2005,3148,0.99,20050530,Monday,30,Weekday,May,2,2005,Mike,Hillyer


Databricks data profile. Run in Databricks to view.

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rental_silver

col_name,data_type,comment
customer_first_name,string,
customer_last_name,string,
film_id,int,
store_id,int,
rental_date_key,bigint,
rental_day_name_of_week,string,
rental_day_of_month,int,
rental_weekday_weekend,string,
rental_month_name,string,
rental_quarter,int,


##### 6.3. Gold Table: Perform Aggregations
Create a new Gold table using the CTAS approach. The tables include information regarding total transactions and revenue from specific customers, as well as analysis about the total transactions and revenue weekly.


In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_customer_payments AS (
  SELECT sum(payment_amount) AS TotalPayments
    , customer_last_name AS Customer
  FROM sakila_dlh.fact_rental_silver
  GROUP BY Customer
  ORDER BY TotalPayments DESC);

SELECT * FROM sakila_dlh.fact_customer_payments;

TotalPayments,Customer
35.94,CHISHOLM
33.92,BARBEE
31.94,THRASHER
31.92,POULIN
29.96,CHAPMAN
29.96,SULLIVAN
29.9,PETERS
25.96,HART
25.96,HICKS
23.96,PARKER


In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_rental_week_breakdown_gold AS (
  SELECT SUM(payment_amount) AS TotalRevenue
    , COUNT(payment_amount) AS TotalTransactions
    , rental_weekday_weekend AS TimeOfWeek
  FROM sakila_dlh.fact_rental_silver
  GROUP BY TimeOfWeek
  ORDER BY TotalRevenue DESC);

SELECT * FROM sakila_dlh.fact_rental_week_breakdown_gold;

TotalRevenue,TotalTransactions,TimeOfWeek
2394.32,568,Weekday
610.36,164,Weekend


In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_rental_week_total_breakdown_gold AS (
  SELECT SUM(payment_amount) AS TotalRevenue
    , COUNT(payment_amount) AS TotalTransactions
    , rental_day_name_of_week AS DayofWeek
  FROM sakila_dlh.fact_rental_silver
  GROUP BY DayofWeek
  ORDER BY TotalRevenue DESC);

SELECT * FROM sakila_dlh.fact_rental_week_total_breakdown_gold;

TotalRevenue,TotalTransactions,DayofWeek
877.7,230,Wednesday
740.5,150,Thursday
327.26,74,Tuesday
311.14,86,Saturday
299.22,78,Sunday
251.42,58,Friday
197.44,56,Monday


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.