## Capstone Project: Data Lakehouse with Structured Streaming from Sakila Database

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables
##### Make sure to replace all values for variables that are commented to be replace for connecting MySql and MongoDB Atlas!!

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "aww5kx-mysql.mysql.database.azure.com" #replace
jdbc_port = 3306
src_database = "sakila"

connection_properties = {
  "user" : "aww5kx",                    #replace
  "password" : "passw0!d",              #replace
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "sandbox.ttnfyow"  #replace
atlas_database_name = "sakila"          #replace only if db name is different
atlas_user_name = "herinseo03"          #replace
atlas_password = "q9BZnYnqWdgHq4T5"     #replace

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"
purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
inventory_trans_stream_dir = f"{stream_dir}/inventory_transactions"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"

inventory_trans_output_bronze = f"{database_dir}/fact_inventory_transactions/bronze"
inventory_trans_output_silver = f"{database_dir}/fact_inventory_transactions/silver"
inventory_trans_output_gold   = f"{database_dir}/fact_inventory_transactions/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

### 2.1. Loading all necessary items in MySQL and MongoDB
#### Connecting Databricks Workspace to MySQLWorkbench
##### a. In your resource group, create a MySQL flexible server. Remember the host and password.
##### b. Go into MySql Workbench and create a new server with the hostname being the jdbc_hostname and input your correct password to establish a connection
##### c. Make sure that public access (ip address) is allowed in order to successfly establish a connection
#### Creating the database through MySql Workbench
##### Open each of the 4 sql files provided in the folder "sakila-db-sql" from the Github link and run the files in this order in MYSql Workbench: (1)sakila-schema.sql (2)sakila-data.sql (3)create_sakila_dims.sql (4)populate_DimDate.sql

#### Uploading JSON files to MongoDB
##### a. Login to MongoDB Atlas and in your cluster, create a database named "sakila".
##### b. Within the created database, upload all json files that were provided in the submission GitHub from the folder called "sakila-json".

#### 3.0. Define Global Functions

In [0]:
#####################################################################################
# Use this Function to Fetch a DataFrame from the Azure SQL database server.
#####################################################################################
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure SQL Database'''
    jdbcUrl = f"jdbc:mysql://{host_name}:{port}/{db_name}"
    '''Invoke the spark.read.jdbc() function to query the database, and fill a Pandas DataFrame.'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)
    return dframe

##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database. (sakila data lake house: sakila_dlh)

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "Sakila Database"
LOCATION "dbfs:/FileStore/data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
--PART 1. FETCHING DIMDATE TABLE FROM MYSQL SAKILA DATABASE
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://aww5kx-mysql2.mysql.database.azure.com:3306/sakila", --Replace with your Server Name
  dbtable "DimDate",
  user "aww5kx",    --Replace with your User Name
  password "passw0!d"  --Replace with you password
)

In [0]:
%sql
-- PART 2. STORING dim_date IN DATABRICKS DBFS
-- Enable Unified Catalog
SET spark.databricks.sql.catalog.enabled = true;

-- Use the database
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_date" using data from the view named "view_date"
CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/data"
AS SELECT * FROM view_date;

-- -- Query the table
-- SELECT * FROM sakila_dlh.dim_date;

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
DateKey,int,
Date,date,
Day,int,
DaySuffix,char(2),
Weekday,int,
WeekDayName,varchar(10),
WeekDayName_Short,char(3),
WeekDayName_FirstLetter,char(1),
DOWInMonth,int,
DayOfYear,int,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

DateKey,Date,Day,DaySuffix,Weekday,WeekDayName,WeekDayName_Short,WeekDayName_FirstLetter,DOWInMonth,DayOfYear,WeekOfMonth,WeekOfYear,Month,MonthName,MonthName_Short,MonthName_FirstLetter,Quarter,QuarterName,Year,MMYYYY,MonthYear,IsWeekend,IsHoliday,HolidayName,SpecialDays,FinancialYear,FinancialQuater,FinancialMonth,FirstDateofYear,LastDateofYear,FirstDateofQuater,LastDateofQuater,FirstDateofMonth,LastDateofMonth,FirstDateofWeek,LastDateofWeek,CurrentYear,CurrentQuater,CurrentMonth,CurrentWeek,CurrentDay
20000101,2000-01-01,1,st,7,Saturday,SAT,S,1,1,0,52,1,January,JAN,J,1,First,2000,12000,2000JAN,True,False,,,,,,,,,,,,,,2024,2,5,19,127
20000102,2000-01-02,2,nd,1,Sunday,SUN,S,2,2,0,52,1,January,JAN,J,1,First,2000,12000,2000JAN,True,False,,,,,,,,,,,,,,2024,2,5,19,127
20000103,2000-01-03,3,rd,2,Monday,MON,M,3,3,1,1,1,January,JAN,J,1,First,2000,12000,2000JAN,False,False,,,,,,,,,,,,,,2024,2,5,19,127
20000104,2000-01-04,4,th,3,Tuesday,TUE,T,4,4,1,1,1,January,JAN,J,1,First,2000,12000,2000JAN,False,False,,,,,,,,,,,,,,2024,2,5,19,127
20000105,2000-01-05,5,th,4,Wednesday,WED,W,5,5,1,1,1,January,JAN,J,1,First,2000,12000,2000JAN,False,False,,,,,,,,,,,,,,2024,2,5,19,127


##### 1.3. Create a New Table that Sources dim_actor from an Azure MySQL database.

In [0]:
%sql
--DIM_ACTOR CREATION & POPULATION
-- Create a Temporary View named "view_dim_actor" that extracts data from your MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_dim_actor
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://aww5kx-mysql2.mysql.database.azure.com:3306/sakila", --Replace with your Server Name
  dbtable "dim_actor",
  user "aww5kx",    --Replace with your User Name
  password "passw0!d"  --Replace with you password
)

In [0]:
%sql
-- Enable Unified Catalog
SET spark.databricks.sql.catalog.enabled = true;

-- Use the database
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_actor" using data from the view named "view_dim_actor"
CREATE OR REPLACE TABLE sakila_dlh.dim_actor
COMMENT "Category Dimension Table"
LOCATION "dbfs:/FileStore/data"
AS SELECT * FROM view_dim_actor;

-- -- Query the table
-- SELECT * FROM sakila_dlh.dim_actor;

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_actor;

col_name,data_type,comment
actor_key,int,
actor_id,int,
first_name,varchar(45),
last_name,varchar(45),
last_update,timestamp,
actor_full_name,varchar(100),
,,
# Delta Statistics Columns,,
Column Names,"first_name, actor_full_name, last_update, last_name, actor_id, actor_key",
Column Selection Method,first-32,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_actor LIMIT 5

actor_key,actor_id,first_name,last_name,last_update,actor_full_name
1,1,PENELOPE,GUINESS,2006-02-15T04:34:33Z,PENELOPE GUINESS
2,2,NICK,WAHLBERG,2006-02-15T04:34:33Z,NICK WAHLBERG
3,3,ED,CHASE,2006-02-15T04:34:33Z,ED CHASE
4,4,JENNIFER,DAVIS,2006-02-15T04:34:33Z,JENNIFER DAVIS
5,5,JOHNNY,LOLLOBRIGIDA,2006-02-15T04:34:33Z,JOHNNY LOLLOBRIGIDA


##### 1.4. Create a New Table that Sources dim_category from an Azure MySQL database.

In [0]:
%sql
--DIM_CATEGORY CREATION & POPULATION
-- Create a Temporary View named "view_dim_category" that extracts data from your MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_dim_category
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://aww5kx-mysql2.mysql.database.azure.com:3306/sakila", --Replace with your Server Name
  dbtable "dim_category",
  user "aww5kx",    --Replace with your User Name
  password "passw0!d"  --Replace with you password
)

In [0]:
%sql
-- Enable Unified Catalog
SET spark.databricks.sql.catalog.enabled = true;

-- Use the database
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_category" using data from the view named "view_category"
CREATE OR REPLACE TABLE sakila_dlh.dim_category
COMMENT "Category Dimension Table"
LOCATION "dbfs:/FileStore/data"
AS SELECT * FROM view_dim_category;

-- -- Query the table
-- SELECT * FROM sakila_dlh.dim_category;

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_category;

col_name,data_type,comment
category_key,int,
category_id,int,
name,varchar(25),
last_update,timestamp,
,,
# Delta Statistics Columns,,
Column Names,"category_key, category_id, name, last_update",
Column Selection Method,first-32,
,,
# Detailed Table Information,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_category LIMIT 5

category_key,category_id,name,last_update
1,1,Action,2006-02-15T04:46:27Z
2,2,Animation,2006-02-15T04:46:27Z
3,3,Children,2006-02-15T04:46:27Z
4,4,Classics,2006-02-15T04:46:27Z
5,5,Comedy,2006-02-15T04:46:27Z


##### 1.5. Create a New Table that Sources dim_customer from an Azure MySQL database.

In [0]:
%sql
--DIM_CUSTOMER CREATION & POPULATION
-- Create a Temporary View named "view_dim_customer" that extracts data from your MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_dim_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://aww5kx-mysql2.mysql.database.azure.com:3306/sakila", --Replace with your Server Name
  dbtable "dim_customer",
  user "aww5kx",    --Replace with your User Name
  password "passw0!d"  --Replace with you password
)

In [0]:
%sql
-- Enable Unified Catalog
SET spark.databricks.sql.catalog.enabled = true;

-- Use the database
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_customer" using data from the view named "view_dim_customer"
CREATE OR REPLACE TABLE sakila_dlh.dim_customer
COMMENT "Category Dimension Table"
LOCATION "dbfs:/FileStore/data"
AS SELECT * FROM view_dim_customer;

-- -- Query the table
-- SELECT * FROM sakila_dlh.dim_customer;

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
customer_key,int,
customer_id,int,
store_id,int,
first_name,varchar(45),
last_name,varchar(45),
email,varchar(50),
address_id,int,
active,boolean,
create_date,timestamp,
last_update,timestamp,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_key,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,True,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,True,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
3,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,True,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
4,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,True,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
5,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,True,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z


##### 1.6. Create a New Table that Sources dim_film from an Azure MySQL database.

In [0]:
%sql
--DIM_FILM CREATION & POPULATION
-- Create a Temporary View named "view_dim_film" that extracts data from your MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_dim_film
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://aww5kx-mysql2.mysql.database.azure.com:3306/sakila", --Replace with your Server Name
  dbtable "dim_film",
  user "aww5kx",    --Replace with your User Name
  password "passw0!d"  --Replace with you password
)

In [0]:
%sql
-- Enable Unified Catalog
SET spark.databricks.sql.catalog.enabled = true;

-- Use the database
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_film" using data from the view named "view_dim_film"
CREATE OR REPLACE TABLE sakila_dlh.dim_film
COMMENT "Category Dimension Table"
LOCATION "dbfs:/FileStore/data"
AS SELECT * FROM view_dim_film;

-- -- Query the table
-- SELECT * FROM sakila_dlh.dim_film;

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film;

col_name,data_type,comment
film_key,int,
film_id,int,
title,varchar(128),
description,varchar(65535),
release_year,date,
language_id,int,
original_language_id,int,
rental_duration,int,
rental_rate,"decimal(4,2)",
length,int,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5

film_key,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006-01-01,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15T05:03:42Z
2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006-01-01,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15T05:03:42Z
3,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006-01-01,1,,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",2006-02-15T05:03:42Z
4,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006-01-01,1,,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",2006-02-15T05:03:42Z
5,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006-01-01,1,,6,2.99,130,22.99,G,Deleted Scenes,2006-02-15T05:03:42Z


##### 1.7. Create a New Table that Sources dim_store from an Azure MySQL database.

In [0]:
%sql
--DIM_STORE CREATION & POPULATION
-- Create a Temporary View named "view_dim_store" that extracts data from your MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_dim_store
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://aww5kx-mysql2.mysql.database.azure.com:3306/sakila", --Replace with your Server Name
  dbtable "dim_store",
  user "aww5kx",    --Replace with your User Name
  password "passw0!d"  --Replace with you password
)

In [0]:
%sql
-- Enable Unified Catalog
SET spark.databricks.sql.catalog.enabled = true;

-- Use the database
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_store" using data from the view named "view_dim_store"
CREATE OR REPLACE TABLE sakila_dlh.dim_store
COMMENT "Category Dimension Table"
LOCATION "dbfs:/FileStore/data"
AS SELECT * FROM view_dim_store;

-- -- Query the table
-- SELECT * FROM sakila_dlh.dim_store;

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_store;

col_name,data_type,comment
store_key,int,
store_id,int,
manager_staff_id,int,
address_id,int,
last_update,timestamp,
,,
# Delta Statistics Columns,,
Column Names,"store_id, address_id, manager_staff_id, last_update, store_key",
Column Selection Method,first-32,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_store LIMIT 5

store_key,store_id,manager_staff_id,address_id,last_update
1,1,1,1,2006-02-15T04:57:12Z
2,2,2,2,2006-02-15T04:57:12Z


##### 1.8. Create a New Table that Sources fact_rental_payment from an Azure MySQL database.

In [0]:
%sql
--fact_rental_payment CREATION & POPULATION
-- Create a Temporary View named "view_fact_rental" that extracts data from your MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_fact_rental
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://aww5kx-mysql2.mysql.database.azure.com:3306/sakila", --Replace with your Server Name
  dbtable "fact_rental_payment",
  user "aww5kx",    --Replace with your User Name
  password "passw0!d"  --Replace with you password
)

In [0]:
%sql
-- Enable Unified Catalog
SET spark.databricks.sql.catalog.enabled = true;

-- Use the database
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_store" using data from the view named "view_fact_rental"
CREATE OR REPLACE TABLE sakila_dlh.fact_rental_payment
COMMENT "Category Dimension Table"
LOCATION "dbfs:/FileStore/data"
AS SELECT * FROM view_fact_rental;

-- -- Query the table
-- SELECT * FROM sakila_dlh.fact_rental_payment;

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rental_payment;

col_name,data_type,comment
rental_id,int,
payment_id,int,
customer_id,int,
staff_id,int,
inventory_id,bigint,
amount,"decimal(5,2)",
rental_date,timestamp,
return_date,timestamp,
,,
# Delta Statistics Columns,,


In [0]:
%sql
SELECT * FROM sakila_dlh.fact_rental_payment LIMIT 5

rental_id,payment_id,customer_id,staff_id,inventory_id,amount,rental_date,return_date
1,3504,130,1,367,2.99,2005-05-24T22:53:30Z,2005-05-26T22:04:30Z
2,12377,459,1,1525,2.99,2005-05-24T22:54:33Z,2005-05-28T19:40:33Z
3,11032,408,1,1711,3.99,2005-05-24T23:03:39Z,2005-06-01T22:12:39Z
4,8987,333,2,2452,4.99,2005-05-24T23:04:41Z,2005-06-03T01:43:41Z
5,6003,222,1,2079,6.99,2005-05-24T23:05:21Z,2005-06-02T04:33:21Z


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
# Define the batch_dir variable
batch_dir = 'dbfs:/FileStore/data'

# Display the contents of the batch_dir directory
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/data/_delta_log/,_delta_log/,0,1715030034000
dbfs:/FileStore/data/part-00000-030dbe08-fdb2-4445-905e-902543702bb2.c000.snappy.parquet,part-00000-030dbe08-fdb2-4445-905e-902543702bb2.c000.snappy.parquet,52132,1715033974000
dbfs:/FileStore/data/part-00000-0bd42d8f-3550-4826-8e3f-17e4873cc358.c000.snappy.parquet,part-00000-0bd42d8f-3550-4826-8e3f-17e4873cc358.c000.snappy.parquet,52132,1715030703000
dbfs:/FileStore/data/part-00000-132a8bc0-5678-40db-9eaa-16a749c55647.c000.snappy.parquet,part-00000-132a8bc0-5678-40db-9eaa-16a749c55647.c000.snappy.parquet,1557,1715041759000
dbfs:/FileStore/data/part-00000-14cbd8f7-a3d3-4513-90bd-6b397fa4ad5f.c000.snappy.parquet,part-00000-14cbd8f7-a3d3-4513-90bd-6b397fa4ad5f.c000.snappy.parquet,1557,1715031375000
dbfs:/FileStore/data/part-00000-2c91bdc0-e733-4630-bb26-bee44475bb76.c000.snappy.parquet,part-00000-2c91bdc0-e733-4630-bb26-bee44475bb76.c000.snappy.parquet,27464,1715033540000
dbfs:/FileStore/data/part-00000-32bd16d7-3774-4379-be6b-3a96d173ba36.c000.snappy.parquet,part-00000-32bd16d7-3774-4379-be6b-3a96d173ba36.c000.snappy.parquet,8352,1715032051000
dbfs:/FileStore/data/part-00000-41567023-bb6a-4767-8c81-28ecdbe275bf.c000.snappy.parquet,part-00000-41567023-bb6a-4767-8c81-28ecdbe275bf.c000.snappy.parquet,8352,1715032028000
dbfs:/FileStore/data/part-00000-4550a3fa-7c20-4c1e-ab33-fafd9e178be1.c000.snappy.parquet,part-00000-4550a3fa-7c20-4c1e-ab33-fafd9e178be1.c000.snappy.parquet,8352,1715032425000
dbfs:/FileStore/data/part-00000-4717505d-9837-4be1-9e87-5c94114794d4.c000.snappy.parquet,part-00000-4717505d-9837-4be1-9e87-5c94114794d4.c000.snappy.parquet,1466,1715033551000


##### 2.3.1. USING THE JSON FILES WE PRELOADED TO MONGODB, Fetch Some Data Tables from the MongoDB Collection Created

In [0]:
%scala
import com.mongodb.spark._

val userName = "herinseo03"         //replace
val pwd = "q9BZnYnqWdgHq4T5"        //replace
val clusterName = "sandbox.ttnfyow" //replace
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala
// creating and saving df_actor table extracted from actor in atlas
val df_actor = spark.read.format("com.mongodb.spark.sql.DefaultSource")
  .option("spark.mongodb.input.uri", atlas_uri)
  .option("database", "sakila-db-json")
  .option("collection", "actor")
  .load()
  .select("_id", "actor_id", "first_name", "last_name", "last_update")

display(df_actor)
df_actor.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.df_actor")

_id,actor_id,first_name,last_name,last_update
List(660db4219964b6185fda4b43),1,PENELOPE,GUINESS,2006-02-15 04:34:33
List(660db4219964b6185fda4b44),2,NICK,WAHLBERG,2006-02-15 04:34:33
List(660db4219964b6185fda4b45),3,ED,CHASE,2006-02-15 04:34:33
List(660db4219964b6185fda4b46),4,JENNIFER,DAVIS,2006-02-15 04:34:33
List(660db4219964b6185fda4b47),5,JOHNNY,LOLLOBRIGIDA,2006-02-15 04:34:33
List(660db4219964b6185fda4b48),6,BETTE,NICHOLSON,2006-02-15 04:34:33
List(660db4219964b6185fda4b49),7,GRACE,MOSTEL,2006-02-15 04:34:33
List(660db4219964b6185fda4b4a),8,MATTHEW,JOHANSSON,2006-02-15 04:34:33
List(660db4219964b6185fda4b4b),9,JOE,SWANK,2006-02-15 04:34:33
List(660db4219964b6185fda4b4c),10,CHRISTIAN,GABLE,2006-02-15 04:34:33


In [0]:
%scala
// creating and saving df_category table extracted from category in atlas
val df_category = spark.read.format("com.mongodb.spark.sql.DefaultSource")
  .option("spark.mongodb.input.uri", atlas_uri)
  .option("database", "sakila-db-json")
  .option("collection", "category")
  .load()
  .select("_id", "category_id", "name", "last_update")

display(df_category)
df_category.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.df_category")

_id,category_id,name,last_update
List(660db4229964b6185fda4c0b),1,Action,2006-02-15 04:46:27
List(660db4229964b6185fda4c0c),2,Animation,2006-02-15 04:46:27
List(660db4229964b6185fda4c0d),3,Children,2006-02-15 04:46:27
List(660db4229964b6185fda4c0e),4,Classics,2006-02-15 04:46:27
List(660db4229964b6185fda4c0f),5,Comedy,2006-02-15 04:46:27
List(660db4229964b6185fda4c10),6,Documentary,2006-02-15 04:46:27
List(660db4229964b6185fda4c11),7,Drama,2006-02-15 04:46:27
List(660db4229964b6185fda4c12),8,Family,2006-02-15 04:46:27
List(660db4229964b6185fda4c13),9,Foreign,2006-02-15 04:46:27
List(660db4229964b6185fda4c14),10,Games,2006-02-15 04:46:27


In [0]:
%scala
// creating and saving df_city table extracted from city in atlas
val df_city = spark.read.format("com.mongodb.spark.sql.DefaultSource")
  .option("spark.mongodb.input.uri", atlas_uri)
  .option("database", "sakila-db-json")
  .option("collection", "city")
  .load()
  .select("_id", "city_id", "city", "country_id", "last_update")

display(df_city)
df_city.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.df_city")

_id,city_id,city,country_id,last_update
List(660db4229964b6185fda4c1b),1,A Coruña (La Coruña),87,2006-02-15 04:45:25
List(660db4229964b6185fda4c1c),2,Abha,82,2006-02-15 04:45:25
List(660db4229964b6185fda4c1d),3,Abu Dhabi,101,2006-02-15 04:45:25
List(660db4229964b6185fda4c1e),4,Acuña,60,2006-02-15 04:45:25
List(660db4229964b6185fda4c1f),5,Adana,97,2006-02-15 04:45:25
List(660db4229964b6185fda4c20),6,Addis Abeba,31,2006-02-15 04:45:25
List(660db4229964b6185fda4c21),7,Aden,107,2006-02-15 04:45:25
List(660db4229964b6185fda4c22),8,Adoni,44,2006-02-15 04:45:25
List(660db4229964b6185fda4c23),9,Ahmadnagar,44,2006-02-15 04:45:25
List(660db4229964b6185fda4c24),10,Akishima,50,2006-02-15 04:45:25


In [0]:
%scala
// creating and saving df_country table extracted from country in atlas
val df_country = spark.read.format("com.mongodb.spark.sql.DefaultSource")
  .option("spark.mongodb.input.uri", atlas_uri)
  .option("database", "sakila-db-json")
  .option("collection", "country")
  .load()
  .select("_id", "country_id", "country", "last_update")

display(df_country)
df_country.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.df_country")

_id,country_id,country,last_update
List(660db4249964b6185fda4e73),1,Afghanistan,2006-02-15 04:44:00
List(660db4249964b6185fda4e74),2,Algeria,2006-02-15 04:44:00
List(660db4249964b6185fda4e75),3,American Samoa,2006-02-15 04:44:00
List(660db4249964b6185fda4e76),4,Angola,2006-02-15 04:44:00
List(660db4249964b6185fda4e77),5,Anguilla,2006-02-15 04:44:00
List(660db4249964b6185fda4e78),6,Argentina,2006-02-15 04:44:00
List(660db4249964b6185fda4e79),7,Armenia,2006-02-15 04:44:00
List(660db4249964b6185fda4e7a),8,Australia,2006-02-15 04:44:00
List(660db4249964b6185fda4e7b),9,Austria,2006-02-15 04:44:00
List(660db4249964b6185fda4e7c),10,Azerbaijan,2006-02-15 04:44:00


In [0]:
%scala
// creating and saving df_customer table extracted from customer in atlas
val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
  .option("spark.mongodb.input.uri", atlas_uri)
  .option("database", "sakila-db-json")
  .option("collection", "customer")
  .load()
  .select("_id", "customer_id", "store_id", "first_name", "last_name", "email", "address_id", "active", "create_date", "last_update")

display(df_customer)

df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.df_customer")

_id,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
List(660db4259964b6185fda4ee0),1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
List(660db4259964b6185fda4ee1),2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
List(660db4259964b6185fda4ee2),3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
List(660db4259964b6185fda4ee3),4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
List(660db4259964b6185fda4ee4),5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20
List(660db4259964b6185fda4ee5),6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,10,1,2006-02-14 22:04:36,2006-02-15 04:57:20
List(660db4259964b6185fda4ee6),7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,11,1,2006-02-14 22:04:36,2006-02-15 04:57:20
List(660db4259964b6185fda4ee7),8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,12,1,2006-02-14 22:04:36,2006-02-15 04:57:20
List(660db4259964b6185fda4ee8),9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,13,1,2006-02-14 22:04:36,2006-02-15 04:57:20
List(660db4259964b6185fda4ee9),10,1,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,14,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [0]:
%scala
// creating and saving df_film table extracted from film in atlas
val df_film = spark.read.format("com.mongodb.spark.sql.DefaultSource")
  .option("spark.mongodb.input.uri", atlas_uri)
  .option("database", "sakila-db-json")
  .option("collection", "film")
  .load()
  .select("_id", "film_id", "title", "description", "release_year", "language_id", "original_language_id", "rental_duration", "rental_rate", "length", "replacement_cost", "rating", "special_features", "last_update")

display(df_film)

df_film.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.df_film")

_id,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
List(660db43b9964b6185fda6981),1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
List(660db43b9964b6185fda6982),2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42
List(660db43b9964b6185fda6983),3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,1,,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",2006-02-15 05:03:42
List(660db43b9964b6185fda6984),4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,1,,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",2006-02-15 05:03:42
List(660db43b9964b6185fda6985),5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,1,,6,2.99,130,22.99,G,Deleted Scenes,2006-02-15 05:03:42
List(660db43b9964b6185fda6986),6,AGENT TRUMAN,A Intrepid Panorama of a Robot And a Boy who must Escape a Sumo Wrestler in Ancient China,2006,1,,3,2.99,169,17.99,PG,Deleted Scenes,2006-02-15 05:03:42
List(660db43b9964b6185fda6987),7,AIRPLANE SIERRA,A Touching Saga of a Hunter And a Butler who must Discover a Butler in A Jet Boat,2006,1,,6,4.99,62,28.99,PG-13,"Trailers,Deleted Scenes",2006-02-15 05:03:42
List(660db43b9964b6185fda6988),8,AIRPORT POLLOCK,A Epic Tale of a Moose And a Girl who must Confront a Monkey in Ancient India,2006,1,,6,4.99,54,15.99,R,Trailers,2006-02-15 05:03:42
List(660db43b9964b6185fda6989),9,ALABAMA DEVIL,A Thoughtful Panorama of a Database Administrator And a Mad Scientist who must Outgun a Mad Scientist in A Jet Boat,2006,1,,3,2.99,114,21.99,PG-13,"Trailers,Deleted Scenes",2006-02-15 05:03:42
List(660db43b9964b6185fda698a),10,ALADDIN CALENDAR,A Action-Packed Tale of a Man And a Lumberjack who must Reach a Feminist in Ancient China,2006,1,,6,4.99,63,24.99,NC-17,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [0]:
%scala
// creating and saving df_dimactor table extracted from dim_actor in atlas
val df_dimactor = spark.read.format("com.mongodb.spark.sql.DefaultSource")
  .option("spark.mongodb.input.uri", atlas_uri)
  .option("database", "sakila-db-json")
  .option("collection", "dim_actor")
  .load()
  .select("_id", "actor_key", "actor_id", "first_name", "last_name", "last_update", "actor_full_name")

df_dimactor.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_actor_spark")

In [0]:
%scala
// creating and saving df_rental table extracted from rental in atlas
val df_rental = spark.read.format("com.mongodb.spark.sql.DefaultSource")
  .option("spark.mongodb.input.uri", atlas_uri)
  .option("database", "sakila-db-json")
  .option("collection", "rental")
  .load()
  .select("_id", "rental_id", "rental_date", "inventory_id", "customer_id", "return_date", "staff_id", "last_update")

df_rental.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.df_rental")

In [0]:
%scala
// creating and saving df_payment table extracted from payment in atlas
val df_payment = spark.read.format("com.mongodb.spark.sql.DefaultSource")
  .option("spark.mongodb.input.uri", atlas_uri)
  .option("database", "sakila-db-json")
  .option("collection", "payment")
  .load()
  .select("_id", "payment_id", "customer_id", "staff_id", "rental_id", "amount", "payment_date", "last_update")

df_payment.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.df_payment")

##### 2.4. Use the Spark DataFrame to Create a New Date Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_dimactor.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_actor_spark")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_actor_spark

col_name,data_type,comment
_id,struct,
actor_key,int,
actor_id,int,
first_name,string,
last_name,string,
last_update,string,
actor_full_name,string,
,,
# Delta Statistics Columns,,
Column Names,"first_name, _id, actor_full_name, last_update, last_name, actor_id, actor_key",


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_actor_spark LIMIT 5

_id,actor_key,actor_id,first_name,last_name,last_update,actor_full_name
List(660db4279964b6185fda5137),1,1,PENELOPE,GUINESS,2006-02-15 04:34:33,PENELOPE GUINESS
List(660db4279964b6185fda5138),2,2,NICK,WAHLBERG,2006-02-15 04:34:33,NICK WAHLBERG
List(660db4279964b6185fda5139),3,3,ED,CHASE,2006-02-15 04:34:33,ED CHASE
List(660db4279964b6185fda513a),4,4,JENNIFER,DAVIS,2006-02-15 04:34:33,JENNIFER DAVIS
List(660db4279964b6185fda513b),5,5,JOHNNY,LOLLOBRIGIDA,2006-02-15 04:34:33,JOHNNY LOLLOBRIGIDA


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File
#####  *Upload local CSV file ("fact_rental_payment.csv") provided from GitHub to DBFS manually by pressing the "+" button on the left drop down menu, add data via dbfs and create it through the databricks ui, copy paste the absolute path that was created to file_path below.*

In [0]:
file_path = "dbfs:/FileStore/tables/fact_rental_payment.csv"
df_newfact = spark.read.csv(file_path, header=True, inferSchema=True)

In [0]:
df_newfact.printSchema()

root
 |-- rental_id: integer (nullable = true)
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- inventory_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- rental_date: timestamp (nullable = true)
 |-- return_date: timestamp (nullable = true)



In [0]:
df_newfact.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.new_fact_rental_payment")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.new_fact_rental_payment;

col_name,data_type,comment
rental_id,int,
payment_id,int,
customer_id,int,
staff_id,int,
inventory_id,int,
amount,double,
rental_date,timestamp,
return_date,timestamp,
,,
# Delta Statistics Columns,,


In [0]:
%sql
SELECT * FROM sakila_dlh.new_fact_rental_payment LIMIT 5;

rental_id,payment_id,customer_id,staff_id,inventory_id,amount,rental_date,return_date
1,3504,130,1,367,2.99,2005-05-24T22:53:30Z,2005-05-26T22:04:30Z
2,12377,459,1,1525,2.99,2005-05-24T22:54:33Z,2005-05-28T19:40:33Z
3,11032,408,1,1711,3.99,2005-05-24T23:03:39Z,2005-06-01T22:12:39Z
4,8987,333,2,2452,4.99,2005-05-24T23:04:41Z,2005-06-03T01:43:41Z
5,6003,222,1,2079,6.99,2005-05-24T23:05:21Z,2005-06-02T04:33:21Z


##### Verify Tables in sakila_dlh

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_actor,False
sakila_dlh,dim_actor_spark,False
sakila_dlh,dim_category,False
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_date_new,False
sakila_dlh,dim_film,False
sakila_dlh,dim_store,False
sakila_dlh,fact_rental_payment,False
sakila_dlh,new_fact_rental_payment,False


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Rental Payment Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data
##### *Upload the fact_rental_payment_2.json file from GitHub submission to dbfs*

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "rental_id INT, payment_id INT, customer_id INT, staff_id INT, inventory_id INT, amount DOUBLE, rental_date TIMESTAMP, return_date TIMESTAMP")
 .option("cloudFiles.schemaLocation", "dbfs:/FileStore/data/sakila_dlh/fact_rental_payment_2_json") # replace with your dbfs file path for the json file uploaded
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load("dbfs:/FileStore/tables")  # update this path to your JSON stream directory for sakila data
 .createOrReplaceTempView("sakila_rental_payment_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW fact_rent_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM sakila_rental_payment_raw_tempview
)

In [0]:
%sql
SELECT * FROM fact_rent_bronze_tempview

CurrentDay,CurrentMonth,CurrentQuater,CurrentWeek,CurrentYear,DOWInMonth,Date,DateKey,Day,DayOfYear,DaySuffix,FinancialMonth,FinancialQuater,FinancialYear,FirstDateofMonth,FirstDateofQuater,FirstDateofWeek,FirstDateofYear,HolidayName,IsHoliday,IsWeekend,LastDateofMonth,LastDateofQuater,LastDateofWeek,LastDateofYear,MMYYYY,Month,MonthName,MonthName_FirstLetter,MonthName_Short,MonthYear,Quarter,QuarterName,SpecialDays,WeekDayName,WeekDayName_FirstLetter,WeekDayName_Short,WeekOfMonth,WeekOfYear,Weekday,Year,active,actor_full_name,actor_id,actor_key,address_id,amount,category_id,category_key,city,city_id,country,country_id,create_date,customer_id,description,email,film_id,film_key,first_name,inventory_id,language_id,last_name,last_update,length,manager_staff_id,name,original_language_id,payment_date,payment_id,rating,release_year,rental_date,rental_duration,rental_id,rental_rate,replacement_cost,return_date,special_features,staff_id,store_id,store_key,title,_rescued_data,receipt_time,source_file
92,4,2,14,2024,1,2000-01-01,20000101,1,1,st,,,,,,,,,0,1,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Saturday,S,SAT,0,52,7,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2024-05-07T19:27:29.538Z,dbfs:/FileStore/tables/s/DimDate.json
92,4,2,14,2024,2,2000-01-02,20000102,2,2,nd,,,,,,,,,0,1,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Sunday,S,SUN,0,52,1,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2024-05-07T19:27:29.538Z,dbfs:/FileStore/tables/s/DimDate.json
92,4,2,14,2024,3,2000-01-03,20000103,3,3,rd,,,,,,,,,0,0,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Monday,M,MON,1,1,2,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2024-05-07T19:27:29.538Z,dbfs:/FileStore/tables/s/DimDate.json
92,4,2,14,2024,4,2000-01-04,20000104,4,4,th,,,,,,,,,0,0,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Tuesday,T,TUE,1,1,3,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2024-05-07T19:27:29.538Z,dbfs:/FileStore/tables/s/DimDate.json
92,4,2,14,2024,5,2000-01-05,20000105,5,5,th,,,,,,,,,0,0,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Wednesday,W,WED,1,1,4,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2024-05-07T19:27:29.538Z,dbfs:/FileStore/tables/s/DimDate.json
92,4,2,14,2024,6,2000-01-06,20000106,6,6,th,,,,,,,,,0,0,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Thursday,T,THU,1,1,5,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2024-05-07T19:27:29.538Z,dbfs:/FileStore/tables/s/DimDate.json
92,4,2,14,2024,7,2000-01-07,20000107,7,7,th,,,,,,,,,0,0,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Friday,F,FRI,1,1,6,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2024-05-07T19:27:29.538Z,dbfs:/FileStore/tables/s/DimDate.json
92,4,2,14,2024,8,2000-01-08,20000108,8,8,th,,,,,,,,,0,1,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Saturday,S,SAT,1,1,7,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2024-05-07T19:27:29.538Z,dbfs:/FileStore/tables/s/DimDate.json
92,4,2,14,2024,9,2000-01-09,20000109,9,9,th,,,,,,,,,0,1,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Sunday,S,SUN,1,1,1,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2024-05-07T19:27:29.538Z,dbfs:/FileStore/tables/s/DimDate.json
92,4,2,14,2024,10,2000-01-10,20000110,10,10,th,,,,,,,,,0,0,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Monday,M,MON,2,2,2,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2024-05-07T19:27:29.538Z,dbfs:/FileStore/tables/s/DimDate.json


In [0]:
(spark.table("sakila_rental_payment_raw_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentpay_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f5a3c1542b0>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentpay_bronze")
  .createOrReplaceTempView("rent_pay_silver_tempview"))

In [0]:
%sql
SELECT * FROM rent_pay_silver_tempview

CurrentDay,CurrentMonth,CurrentQuater,CurrentWeek,CurrentYear,DOWInMonth,Date,DateKey,Day,DayOfYear,DaySuffix,FinancialMonth,FinancialQuater,FinancialYear,FirstDateofMonth,FirstDateofQuater,FirstDateofWeek,FirstDateofYear,HolidayName,IsHoliday,IsWeekend,LastDateofMonth,LastDateofQuater,LastDateofWeek,LastDateofYear,MMYYYY,Month,MonthName,MonthName_FirstLetter,MonthName_Short,MonthYear,Quarter,QuarterName,SpecialDays,WeekDayName,WeekDayName_FirstLetter,WeekDayName_Short,WeekOfMonth,WeekOfYear,Weekday,Year,active,actor_full_name,actor_id,actor_key,address_id,amount,category_id,category_key,city,city_id,country,country_id,create_date,customer_id,description,email,film_id,film_key,first_name,inventory_id,language_id,last_name,last_update,length,manager_staff_id,name,original_language_id,payment_date,payment_id,rating,release_year,rental_date,rental_duration,rental_id,rental_rate,replacement_cost,return_date,special_features,staff_id,store_id,store_key,title,_rescued_data
92,4,2,14,2024,1,2000-01-01,20000101,1,1,st,,,,,,,,,0,1,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Saturday,S,SAT,0,52,7,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
92,4,2,14,2024,2,2000-01-02,20000102,2,2,nd,,,,,,,,,0,1,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Sunday,S,SUN,0,52,1,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
92,4,2,14,2024,3,2000-01-03,20000103,3,3,rd,,,,,,,,,0,0,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Monday,M,MON,1,1,2,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
92,4,2,14,2024,4,2000-01-04,20000104,4,4,th,,,,,,,,,0,0,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Tuesday,T,TUE,1,1,3,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
92,4,2,14,2024,5,2000-01-05,20000105,5,5,th,,,,,,,,,0,0,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Wednesday,W,WED,1,1,4,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
92,4,2,14,2024,6,2000-01-06,20000106,6,6,th,,,,,,,,,0,0,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Thursday,T,THU,1,1,5,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
92,4,2,14,2024,7,2000-01-07,20000107,7,7,th,,,,,,,,,0,0,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Friday,F,FRI,1,1,6,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
92,4,2,14,2024,8,2000-01-08,20000108,8,8,th,,,,,,,,,0,1,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Saturday,S,SAT,1,1,7,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
92,4,2,14,2024,9,2000-01-09,20000109,9,9,th,,,,,,,,,0,1,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Sunday,S,SUN,1,1,1,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
92,4,2,14,2024,10,2000-01-10,20000110,10,10,th,,,,,,,,,0,0,,,,,12000,1,January,J,JAN,2000JAN,1,First,,Monday,M,MON,2,2,2,2000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [0]:
%sql
DESCRIBE EXTENDED rent_pay_silver_tempview

col_name,data_type,comment
CurrentDay,bigint,
CurrentMonth,bigint,
CurrentQuater,bigint,
CurrentWeek,bigint,
CurrentYear,bigint,
DOWInMonth,bigint,
Date,string,
DateKey,bigint,
Day,bigint,
DayOfYear,bigint,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW rent_pay_silver_tempview AS (
  SELECT r.rental_id,
      r.customer_id,
      c.customer_id AS customer_oid,
      r.inventory_id,
      p.amount AS amount_paid,
      r.rental_date,
      r.return_date,
      r.staff_id,
      p.payment_id,
      p.payment_date AS payment_date_key
  FROM sakila_dlh.df_rental AS r
  INNER JOIN sakila_dlh.dim_customer AS c 
  ON c.customer_id = r.customer_id
  INNER JOIN sakila_dlh.df_payment AS p
  ON p.rental_id = r.rental_id
)

In [0]:
(spark.table("rent_pay_silver_tempview")
      .write
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .mode("append")
      .saveAsTable("fact_rent_pay_silver"))

In [0]:
%sql
SELECT * FROM fact_rent_pay_silver

rental_id,customer_id,customer_oid,inventory_id,amount_paid,rental_date,return_date,staff_id,payment_id,payment_date_key
696,19,19,2076,2.99,2005-05-29 01:59:10,2005-06-01 02:45:10,1,495,2005-05-29 01:59:10
975,7,7,3109,4.99,2005-05-30 21:07:15,2005-06-03 01:48:15,2,177,2005-05-30 21:07:15
424,35,35,2815,6.99,2005-05-27 15:34:01,2005-06-05 09:44:01,1,953,2005-05-27 15:34:01
692,18,18,800,4.99,2005-05-29 01:32:10,2005-06-02 03:54:10,2,470,2005-05-29 01:32:10
916,6,6,1290,0.99,2005-05-30 11:25:01,2005-05-31 09:06:01,1,148,2005-05-30 11:25:01
573,1,1,4020,0.99,2005-05-28 10:35:23,2005-06-03 06:32:23,1,2,2005-05-28 10:35:23
90,25,25,2984,7.99,2005-05-25 14:31:25,2005-06-01 10:07:25,1,656,2005-05-25 14:31:25
696,19,19,2076,2.99,2005-05-29 01:59:10,2005-06-01 02:45:10,1,495,2005-05-29 01:59:10
692,18,18,800,4.99,2005-05-29 01:32:10,2005-06-02 03:54:10,2,470,2005-05-29 01:32:10
975,7,7,3109,4.99,2005-05-30 21:07:15,2005-06-03 01:48:15,2,177,2005-05-30 21:07:15


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rent_pay_silver

col_name,data_type,comment
rental_id,int,
customer_id,int,
customer_oid,int,
inventory_id,int,
amount_paid,double,
rental_date,string,
return_date,string,
staff_id,int,
payment_id,int,
payment_date_key,string,


##### 6.3. Gold Table: Perform Aggregations
Create a new Gold table using the CTAS approach. The table should include the number of rentals sold per customer each Month, along with the Customers' ID and the Month in which the item was rented.

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_monthly_rentals_by_customer_gold AS (
  SELECT customer_id AS CustomerID
    , extract(month from rental_date) AS RentalMonth
    , COUNT(inventory_id) AS RentalCount
  FROM sakila_dlh.fact_rent_pay_silver
  GROUP BY CustomerID, RentalMonth
  ORDER BY RentalCount DESC);

SELECT * FROM sakila_dlh.fact_monthly_rentals_by_customer_gold;

CustomerID,RentalMonth,RentalCount
19,5,144
14,5,140
7,5,132
21,5,105
20,5,90
6,5,84
16,5,84
18,5,66
22,5,66
28,5,64


In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.fact_monthly_rentals_by_customer_gold AS (
  SELECT pc.CustomerID
    , os.rental_id AS ProductNumber
    , pc.ProductCount
  FROM sakila_dlh.fact_rent_pay_silver AS os
  INNER JOIN (
    SELECT customer_id AS CustomerID
    , COUNT(rental_id) AS ProductCount
    FROM sakila_dlh.fact_rent_pay_silver
    WHERE return_date IS NULL -- Assuming return_date column exists and modification is for non-returned orders
    GROUP BY customer_id
  ) AS pc
ON pc.CustomerID = os.customer_id  ORDER BY ProductCount DESC);

SELECT * FROM sakila_dlh.fact_monthly_rentals_by_customer_gold;

CustomerID,ProductNumber,ProductCount


#### 9.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/data/