In [None]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

In [None]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "tuh8gz.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw2"

connection_properties = {
  "user" : "tuh8gz",
  "password" : "MyPasscode123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "sandbox"
atlas_database_name = "sakila_dw2"
atlas_user_name = "dianatuyen"
atlas_password = "MyPasscode123"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/final_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rentals_stream_dir = f"{stream_dir}/rentals"

rentals_output_bronze = f"{database_dir}/fact_rentals/bronze"
rentals_output_silver = f"{database_dir}/fact_rentals/silver"
rentals_output_gold   = f"{database_dir}/fact_rentals/gold"

#dbutils.fs.rm(f"{database_dir}/fact_rentals", True)
#dbutils.fs.rm(database_dir, True)

In [None]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"

    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()

    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]

    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()

    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [None]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Lab 06 Database"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Lab 6.0");

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://tuh8gz.mysql.database.azure.com:3306/sakila_dw2",
  dbtable "dim_date",
  user "tuh8gz",
  password "MyPasscode123"
)

In [None]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [None]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [None]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


In [None]:
%sql
-- Create a Temporary View named "view_product" that extracts data from your MySQL Northwind database.
CREATE OR REPLACE TEMPORARY VIEW view_inventory
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://tuh8gz.mysql.database.azure.com:3306/sakila_dw2",
  dbtable "dim_inventory",
  user "tuh8gz",
  password "MyPasscode123"
)

In [None]:
%sql
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_inventory" using data from the view named "view_inventory"
CREATE OR REPLACE TABLE sakila_dlh.dim_inventory
COMMENT "Inventory Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/northwind_dlh/dim_inventory"
AS SELECT * FROM view_inventory

num_affected_rows,num_inserted_rows


In [None]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory;

col_name,data_type,comment
inventory_id,bigint,
film_id,bigint,
store_id,bigint,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_inventory,
Type,EXTERNAL,
Comment,Inventory Dimension Table,


In [None]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5

inventory_id,film_id,store_id
1,1,1
2,1,1
3,1,1
4,1,1
5,1,2


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [None]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/ds2002-lab06/final_data/batch/sakila_dimCustomers.csv,sakila_dimCustomers.csv,13042,1683233857000
dbfs:/FileStore/ds2002-lab06/final_data/batch/sakila_dimCustomers.json,sakila_dimCustomers.json,69302,1683231390000
dbfs:/FileStore/ds2002-lab06/final_data/batch/sakila_dimFilm.csv,sakila_dimFilm.csv,34146,1683233857000
dbfs:/FileStore/ds2002-lab06/final_data/batch/sakila_dimFilm.json,sakila_dimFilm.json,134090,1683227727000
dbfs:/FileStore/ds2002-lab06/final_data/batch/sakila_dimInventory.csv,sakila_dimInventory.csv,55307,1683233857000
dbfs:/FileStore/ds2002-lab06/final_data/batch/sakila_dimInventory.json,sakila_dimInventory.json,175238,1683227726000
dbfs:/FileStore/ds2002-lab06/final_data/batch/sakila_dimPayment.csv,sakila_dimPayment.csv,84461,1683233857000
dbfs:/FileStore/ds2002-lab06/final_data/batch/sakila_dimPayment.json,sakila_dimPayment.json,132950,1683227726000
dbfs:/FileStore/ds2002-lab06/final_data/batch/sakila_dimRental.csv,sakila_dimRental.csv,84461,1683233857000
dbfs:/FileStore/ds2002-lab06/final_data/batch/sakila_dimRentals.json,sakila_dimRentals.json,174475,1683233416000


In [None]:
source_dir = '/dbfs/FileStore/lab_data/final_data/batch'
json_files = {"customers" : 'sakila_dimCustomers.json',  "payment" : 'sakila_dimPayment.json', "rental" : 'sakila_dimRentals.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files)

Out[16]: <pymongo.results.InsertManyResult at 0x7fbeb073b980>

In [None]:
%scala
import com.mongodb.spark._

val userName = "dianatuyen"
val pwd = "MyPasscode123"
val clusterName = "sandbox.xxbor"
val atlas_uri = "mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [None]:
%scala
import com.mongodb.spark._

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw2").option("collection", "customers").load()
.select("customer_id","store_id","last_name","first_name","active")

display(df_customer)

customer_id,store_id,last_name,first_name,active
1,1,SMITH,MARY,1
2,1,JOHNSON,PATRICIA,1
3,1,WILLIAMS,LINDA,1
4,2,JONES,BARBARA,1
5,1,BROWN,ELIZABETH,1
6,2,DAVIS,JENNIFER,1
7,1,MILLER,MARIA,1
8,2,WILSON,SUSAN,1
9,2,MOORE,MARGARET,1
10,1,TAYLOR,DOROTHY,1


In [None]:
%scala
df_customer.printSchema()

In [None]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [None]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer

col_name,data_type,comment
customer_id,int,
store_id,int,
last_name,string,
first_name,string,
active,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_customer,


In [None]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_id,store_id,last_name,first_name,active
1,1,SMITH,MARY,1
2,1,JOHNSON,PATRICIA,1
3,1,WILLIAMS,LINDA,1
4,2,JONES,BARBARA,1
5,1,BROWN,ELIZABETH,1


In [None]:
%scala
import com.mongodb.spark._

val df_payment = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw2").option("collection", "payment").load()
.select("payment_id", "customer_id","rental_id","amount","payment_date")

display(df_customer)

customer_id,store_id,last_name,first_name,active
1,1,SMITH,MARY,1
2,1,JOHNSON,PATRICIA,1
3,1,WILLIAMS,LINDA,1
4,2,JONES,BARBARA,1
5,1,BROWN,ELIZABETH,1
6,2,DAVIS,JENNIFER,1
7,1,MILLER,MARIA,1
8,2,WILSON,SUSAN,1
9,2,MOORE,MARGARET,1
10,1,TAYLOR,DOROTHY,1


In [None]:
%scala
df_payment.printSchema()

In [None]:
%scala
df_payment.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_payment")

In [None]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_payment

col_name,data_type,comment
payment_id,int,
customer_id,int,
rental_id,int,
amount,double,
payment_date,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_payment,


In [None]:
%sql
SELECT * FROM sakila_dlh.dim_payment LIMIT 5

payment_id,customer_id,rental_id,amount,payment_date
1,1,76,2.99,2005-05-25 11:30:37
2,1,573,0.99,2005-05-28 10:35:23
3,1,1185,5.99,2005-06-15 00:54:12
4,1,1422,0.99,2005-06-15 18:02:53
5,1,1476,9.99,2005-06-15 21:08:46


##### 2.5.1 Fetch Invoice Dimension Data from the New MongoDB Collection

In [None]:
%scala
import com.mongodb.spark._

val df_rental = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw2").option("collection", "rental").load()
.select("rental_id","rental_date","inventory_id","customer_id","return_date", "staff_id")

display(df_rental)

rental_id,rental_date,inventory_id,customer_id,return_date,staff_id
1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1
2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1
3,2005-05-24 23:03:39,1711,408,2005-06-01 22:12:39,1
4,2005-05-24 23:04:41,2452,333,2005-06-03 01:43:41,2
5,2005-05-24 23:05:21,2079,222,2005-06-02 04:33:21,1
6,2005-05-24 23:08:07,2792,549,2005-05-27 01:32:07,1
7,2005-05-24 23:11:53,3995,269,2005-05-29 20:34:53,2
8,2005-05-24 23:31:46,2346,239,2005-05-27 23:33:46,2
9,2005-05-25 00:00:40,2580,126,2005-05-28 00:22:40,1
10,2005-05-25 00:02:21,1824,399,2005-05-31 22:44:21,2


In [None]:
%scala
df_rental.printSchema()

In [None]:
%scala
df_rental.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_rental")

In [None]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_rental

col_name,data_type,comment
rental_id,int,
rental_date,string,
inventory_id,int,
customer_id,int,
return_date,string,
staff_id,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,


In [None]:
%sql
SELECT * FROM sakila_dlh.dim_rental LIMIT 5

rental_id,rental_date,inventory_id,customer_id,return_date,staff_id
1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1
2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1
3,2005-05-24 23:03:39,1711,408,2005-06-01 22:12:39,1
4,2005-05-24 23:04:41,2452,333,2005-06-03 01:43:41,2
5,2005-05-24 23:05:21,2079,222,2005-06-02 04:33:21,1


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [None]:
stores_csv = f"{batch_dir}/sakila_dimStore.csv"

df_stores = spark.read.format('csv').options(header='true', inferSchema='true').load(stores_csv)
display(df_stores)

store_id,address_id
1,1
2,2


In [None]:
df_stores.printSchema()

root
 |-- store_id: integer (nullable = true)
 |-- address_id: integer (nullable = true)



In [None]:
df_stores.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_stores")

In [None]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_stores;

col_name,data_type,comment
store_id,int,
address_id,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_stores,
Type,MANAGED,
Location,dbfs:/FileStore/ds2002-lab06/sakila_dlh/dim_stores,
Provider,delta,


In [None]:
%sql
SELECT * FROM sakila_dlh.dim_stores LIMIT 5;

store_id,address_id
1,1
2,2


In [None]:
film_csv = f"{batch_dir}/sakila_dimFilm.csv"

df_film = spark.read.format('csv').options(header='true', inferSchema='true').load(film_csv)
display(df_film)

film_id,title,rental_duration,rental_rate,replacement_cost
1,ACADEMY DINOSAUR,6,0.99,20.99
2,ACE GOLDFINGER,3,4.99,12.99
3,ADAPTATION HOLES,7,2.99,18.99
4,AFFAIR PREJUDICE,5,2.99,26.99
5,AFRICAN EGG,6,2.99,22.99
6,AGENT TRUMAN,3,2.99,17.99
7,AIRPLANE SIERRA,6,4.99,28.99
8,AIRPORT POLLOCK,6,4.99,15.99
9,ALABAMA DEVIL,3,2.99,21.99
10,ALADDIN CALENDAR,6,4.99,24.99


In [None]:
df_film.printSchema()

root
 |-- film_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rental_duration: integer (nullable = true)
 |-- rental_rate: double (nullable = true)
 |-- replacement_cost: double (nullable = true)



In [None]:
df_film.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_film")

In [None]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film;

col_name,data_type,comment
film_id,int,
title,string,
rental_duration,int,
rental_rate,double,
replacement_cost,double,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_film,


In [None]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5;

film_id,title,rental_duration,rental_rate,replacement_cost
1,ACADEMY DINOSAUR,6,0.99,20.99
2,ACE GOLDFINGER,3,4.99,12.99
3,ADAPTATION HOLES,7,2.99,18.99
4,AFFAIR PREJUDICE,5,2.99,26.99
5,AFRICAN EGG,6,2.99,22.99


In [None]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_film,False
sakila_dlh,dim_inventory,False
sakila_dlh,dim_payment,False
sakila_dlh,dim_rental,False
sakila_dlh,dim_stores,False
,view_date,True
,view_inventory,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [None]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "rental_id BIGINT")
 .option("cloudFiles.schemaHints", "rental_date STRING")
 .option("cloudFiles.schemaHints", "inventory_id BIGINT")
 .option("cloudFiles.schemaHints", "customer_id BIGINT")
 .option("cloudFiles.schemaHints", "return_date STRING")
 .option("cloudFiles.schemaHints", "staff_id BIGINT")
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rentals_stream_dir)
 .createOrReplaceTempView("rentals_raw_tempview"))

In [None]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

In [None]:
%sql
SELECT * FROM rentals_bronze_tempview

customer_id,inventory_id,last_update,rental_date,rental_id,return_date,staff_id,_rescued_data,receipt_time,source_file
130,367,2006-02-15 21:30:53,2005-05-24 22:53:30,1,2005-05-26 22:04:30,1,,2023-05-07T17:19:03.741+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
459,1525,2006-02-15 21:30:53,2005-05-24 22:54:33,2,2005-05-28 19:40:33,1,,2023-05-07T17:19:03.741+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
408,1711,2006-02-15 21:30:53,2005-05-24 23:03:39,3,2005-06-01 22:12:39,1,,2023-05-07T17:19:03.741+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
333,2452,2006-02-15 21:30:53,2005-05-24 23:04:41,4,2005-06-03 01:43:41,2,,2023-05-07T17:19:03.741+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
222,2079,2006-02-15 21:30:53,2005-05-24 23:05:21,5,2005-06-02 04:33:21,1,,2023-05-07T17:19:03.741+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
549,2792,2006-02-15 21:30:53,2005-05-24 23:08:07,6,2005-05-27 01:32:07,1,,2023-05-07T17:19:03.741+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
269,3995,2006-02-15 21:30:53,2005-05-24 23:11:53,7,2005-05-29 20:34:53,2,,2023-05-07T17:19:03.741+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
239,2346,2006-02-15 21:30:53,2005-05-24 23:31:46,8,2005-05-27 23:33:46,2,,2023-05-07T17:19:03.741+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
126,2580,2006-02-15 21:30:53,2005-05-25 00:00:40,9,2005-05-28 00:22:40,1,,2023-05-07T17:19:03.741+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
399,1824,2006-02-15 21:30:53,2005-05-25 00:02:21,10,2005-05-31 22:44:21,2,,2023-05-07T17:19:03.741+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json


In [None]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

Out[37]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fbeaa9b99d0>

In [None]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [None]:
%sql
SELECT * FROM rentals_silver_tempview

customer_id,inventory_id,last_update,rental_date,rental_id,return_date,staff_id,_rescued_data,receipt_time,source_file
130,367,2006-02-15 21:30:53,2005-05-24 22:53:30,1,2005-05-26 22:04:30,1,,2023-05-07T17:20:26.843+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
459,1525,2006-02-15 21:30:53,2005-05-24 22:54:33,2,2005-05-28 19:40:33,1,,2023-05-07T17:20:26.843+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
408,1711,2006-02-15 21:30:53,2005-05-24 23:03:39,3,2005-06-01 22:12:39,1,,2023-05-07T17:20:26.843+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
333,2452,2006-02-15 21:30:53,2005-05-24 23:04:41,4,2005-06-03 01:43:41,2,,2023-05-07T17:20:26.843+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
222,2079,2006-02-15 21:30:53,2005-05-24 23:05:21,5,2005-06-02 04:33:21,1,,2023-05-07T17:20:26.843+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
549,2792,2006-02-15 21:30:53,2005-05-24 23:08:07,6,2005-05-27 01:32:07,1,,2023-05-07T17:20:26.843+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
269,3995,2006-02-15 21:30:53,2005-05-24 23:11:53,7,2005-05-29 20:34:53,2,,2023-05-07T17:20:26.843+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
239,2346,2006-02-15 21:30:53,2005-05-24 23:31:46,8,2005-05-27 23:33:46,2,,2023-05-07T17:20:26.843+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
126,2580,2006-02-15 21:30:53,2005-05-25 00:00:40,9,2005-05-28 00:22:40,1,,2023-05-07T17:20:26.843+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json
399,1824,2006-02-15 21:30:53,2005-05-25 00:02:21,10,2005-05-31 22:44:21,2,,2023-05-07T17:20:26.843+0000,dbfs:/FileStore/ds2002-lab06/final_data/stream/rentals/sakila_rental.json


In [None]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
customer_id,bigint,
inventory_id,bigint,
last_update,string,
rental_date,string,
rental_id,bigint,
return_date,string,
staff_id,bigint,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT r.rental_id,
  r.customer_id,
  c.first_name,
  c.last_name,
  c.active,
  r.inventory_id,
  i.film_id,
  f.title,
  f.rental_duration,
  f.rental_rate,
  f.replacement_cost,
  r.last_update,
  r.return_date,
  retd.day_name_of_week AS returned_day_name_of_week,
  retd.day_of_month AS returned_day_of_month,
  retd.weekday_weekend AS returned_weekday_weekend,
  retd.month_name AS returned_month_name,
  retd.calendar_quarter AS returned_calendar_quarter,
  retd.calendar_year AS returned_calendar_year,
  r.rental_date,
  rend.day_name_of_week AS rental_day_name_of_week,
  rend.day_of_month AS rental_day_of_month,
  rend.weekday_weekend AS rental_weekday_weekend,
  rend.month_name AS rental_month_name,
  rend.calendar_quarter AS rental_calendar_quarter,
  rend.calendar_year AS rental_calendar_year,
  r.staff_id
  FROM rentals_silver_tempview AS r
  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_id = r.customer_id
  INNER JOIN sakila_dlh.dim_inventory AS i
  ON i.inventory_id = r.inventory_id
  INNER JOIN sakila_dlh.dim_film AS f
  ON f.film_id = i.film_id
  LEFT OUTER JOIN sakila_dlh.dim_payment AS p
   ON p.rental_id = r.rental_id
  LEFT OUTER JOIN sakila_dlh.dim_date AS rend
  ON rend.date_key = r.rental_date
  LEFT OUTER JOIN sakila_dlh.dim_date AS retd
  ON retd.date_key = r.return_date
)

In [None]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

Out[50]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fbe94faf7c0>

In [None]:
%sql
SELECT * FROM fact_rentals_silver

rental_id,customer_id,first_name,last_name,active,inventory_id,film_id,title,rental_duration,rental_rate,replacement_cost,last_update,return_date,returned_day_name_of_week,returned_day_of_month,returned_weekday_weekend,returned_month_name,returned_calendar_quarter,returned_calendar_year,rental_date,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_calendar_quarter,rental_calendar_year,staff_id
1,130,CHARLOTTE,HUNTER,1,367,80,BLANKET BEVERLY,7,2.99,21.99,2006-02-15 21:30:53,2005-05-26 22:04:30,,,,,,,2005-05-24 22:53:30,,,,,,,1
2,459,TOMMY,COLLAZO,1,1525,333,FREAKY POCUS,7,2.99,16.99,2006-02-15 21:30:53,2005-05-28 19:40:33,,,,,,,2005-05-24 22:54:33,,,,,,,1
3,408,MANUEL,MURRELL,1,1711,373,GRADUATE LORD,7,2.99,14.99,2006-02-15 21:30:53,2005-06-01 22:12:39,,,,,,,2005-05-24 23:03:39,,,,,,,1
4,333,ANDREW,PURDY,1,2452,535,LOVE SUICIDES,6,0.99,21.99,2006-02-15 21:30:53,2005-06-03 01:43:41,,,,,,,2005-05-24 23:04:41,,,,,,,2
5,222,DELORES,HANSEN,1,2079,450,IDOLS SNATCHERS,5,2.99,29.99,2006-02-15 21:30:53,2005-06-02 04:33:21,,,,,,,2005-05-24 23:05:21,,,,,,,1
6,549,NELSON,CHRISTENSON,1,2792,613,MYSTIC TRUMAN,5,0.99,19.99,2006-02-15 21:30:53,2005-05-27 01:32:07,,,,,,,2005-05-24 23:08:07,,,,,,,1
7,269,CASSANDRA,WALTERS,1,3995,870,SWARM GOLD,4,0.99,12.99,2006-02-15 21:30:53,2005-05-29 20:34:53,,,,,,,2005-05-24 23:11:53,,,,,,,2
8,239,MINNIE,ROMERO,1,2346,510,LAWLESS VISION,6,4.99,29.99,2006-02-15 21:30:53,2005-05-27 23:33:46,,,,,,,2005-05-24 23:31:46,,,,,,,2
9,126,ELLEN,SIMPSON,1,2580,565,MATRIX SNOWMAN,6,4.99,9.99,2006-02-15 21:30:53,2005-05-28 00:22:40,,,,,,,2005-05-25 00:00:40,,,,,,,1
10,399,DANNY,ISOM,1,1824,396,HANGING DEEP,5,4.99,18.99,2006-02-15 21:30:53,2005-05-31 22:44:21,,,,,,,2005-05-25 00:02:21,,,,,,,2


In [None]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rentals_silver

col_name,data_type,comment
rental_id,bigint,
customer_id,bigint,
first_name,string,
last_name,string,
active,int,
inventory_id,bigint,
film_id,bigint,
title,string,
rental_duration,int,
rental_rate,double,


In [None]:

%sql
SELECT SUM(replacement_cost) AS total_replacement_price,
first_name,
last_name,
film_id,
rental_duration,
rental_date,
return_date
FROM sakila_dlh.fact_rentals_silver
GROUP BY first_name,
last_name,
film_id,
rental_duration,
rental_date,
return_date
ORDER BY total_replacement_price DESC

total_replacement_price,first_name,last_name,film_id,rental_duration,rental_date,return_date
29.99,ARMANDO,GRUBER,429,3,2005-05-26 05:29:49,2005-05-28 10:10:49
29.99,RAY,HOULE,819,3,2005-05-25 04:01:32,2005-05-30 03:12:32
29.99,ARLENE,HARVEY,417,5,2005-05-30 18:52:53,2005-06-07 16:19:53
29.99,NAOMI,JENNINGS,803,7,2005-05-26 08:10:22,2005-05-27 03:55:22
29.99,ERIC,ROBERT,450,5,2005-05-31 00:06:20,2005-05-31 21:29:20
29.99,MINNIE,ROMERO,510,6,2005-05-24 23:31:46,2005-05-27 23:33:46
29.99,LUIS,YANEZ,358,5,2005-05-29 17:35:50,2005-06-04 17:05:50
29.99,ROBERTA,HARPER,417,5,2005-05-28 19:16:14,2005-05-31 16:50:14
29.99,FREDERICK,ISBELL,374,3,2005-05-29 17:24:48,2005-06-05 12:25:48
29.99,HEATHER,MORRIS,480,3,2005-05-25 14:13:54,2005-05-29 09:32:54
