DS 2002 Final Project 

Emma Singer

In this project, I used data from the sakila datawarehouse I created in my midterm.  I created the date dimension and the staff dimension in MySQL.  I downloaded a json file of inventory and pushed it up to mongodb to create the inventory dimension.  I downloaded a csv of customers and created the customer dimension in databrirks using the DBFS file system.  I downloaded my fact table illustrating the rentals process from MySQL into 3 separate json files to then upload them mimicking streaming of data.  Finally, I created bronze, silver, and gold tables, illustrating that my data lakehouse works.

Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ecs9ne-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "ecs9ne",
  "password" : "Passw0rd123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "ds2002.wba9jt7"
atlas_database_name = "sakila_dw"
atlas_user_name = "ecs9ne"
atlas_password = "Passw0rd123"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/ds2002-final"
database_dir = f"{base_dir}/{dst_database}"

batch_dir = f"{base_dir}/batch"
stream_dir = f"{base_dir}/stream"

rentals_stream_dir = f"{stream_dir}" #fact table data

rentals_output_bronze = f"{database_dir}/fact_rentals/bronze"
rentals_output_silver = f"{database_dir}/fact_rentals/silver"
rentals_output_gold   = f"{database_dir}/fact_rentals/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rentals", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[2]: True

Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

Populate Dimensions by Ingesting Reference (Cold-path) Data

Fetch Reference Data From an Azure MySQL Database and Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Final Database"
LOCATION "dbfs:/FileStore/ds2002-final/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final");

Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ecs9ne-mysql.mysql.database.azure.com:3306/sakila_dw",
  dbtable "dim_date",
  user "ecs9ne",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-final/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


Create a New Table that Sources Staff Dimension Data from an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_staff
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ecs9ne-mysql.mysql.database.azure.com:3306/sakila_dw",
  dbtable "dim_staff",
  user "ecs9ne",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_staff
COMMENT "Staff Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-final/sakila_dlh/dim_staff"
AS SELECT * FROM view_staff

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff;

col_name,data_type,comment
staff_key,bigint,
first_name,string,
last_name,string,
address_key,bigint,
email,string,
store_key,bigint,
active,bigint,
username,string,
,,
# Detailed Table Information,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 5

staff_key,first_name,last_name,address_key,email,store_key,active,username
1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike
2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon


Fetch Reference Data from a MongoDB Atlas Database and View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/ds2002-final/batch/customers.csv,customers.csv,67117,1682438388000
dbfs:/FileStore/ds2002-final/batch/inventory.json,inventory.json,241597,1682436809000


Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/ds2002-final/batch'
json_files = {"inventory" : 'inventory.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[15]: <pymongo.results.InsertManyResult at 0x7fadc534f0d0>

Fetch Inventory Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._


val df_inventory = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw").option("collection", "inventory").option("uri", "mongodb+srv://ecs9ne:Passw0rd123@ds2002.wba9jt7.mongodb.net/").load()
.select("inventory_key","film_key","store_key","title","release_year","rental_duration","rental_rate","length")

display(df_inventory)

inventory_key,film_key,store_key,title,release_year,rental_duration,rental_rate,length
1,1,1,ACADEMY DINOSAUR,2006,6,0.99,86
2,1,1,ACADEMY DINOSAUR,2006,6,0.99,86
3,1,1,ACADEMY DINOSAUR,2006,6,0.99,86
4,1,1,ACADEMY DINOSAUR,2006,6,0.99,86
5,1,2,ACADEMY DINOSAUR,2006,6,0.99,86
6,1,2,ACADEMY DINOSAUR,2006,6,0.99,86
7,1,2,ACADEMY DINOSAUR,2006,6,0.99,86
8,1,2,ACADEMY DINOSAUR,2006,6,0.99,86
9,2,2,ACE GOLDFINGER,2006,3,4.99,48
10,2,2,ACE GOLDFINGER,2006,3,4.99,48


In [0]:
%scala
df_inventory.printSchema()

Use the Spark DataFrame to Create a New Inventory Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_inventory.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_inventory")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory

col_name,data_type,comment
inventory_key,int,
film_key,int,
store_key,int,
title,string,
release_year,int,
rental_duration,int,
rental_rate,double,
length,int,
,,
# Detailed Table Information,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5

inventory_key,film_key,store_key,title,release_year,rental_duration,rental_rate,length
1,1,1,ACADEMY DINOSAUR,2006,6,0.99,86
2,1,1,ACADEMY DINOSAUR,2006,6,0.99,86
3,1,1,ACADEMY DINOSAUR,2006,6,0.99,86
4,1,1,ACADEMY DINOSAUR,2006,6,0.99,86
5,1,2,ACADEMY DINOSAUR,2006,6,0.99,86


Fetch Data from a File System and Use PySpark to Read From a CSV File (Creating Customer Dimension)

In [0]:
customers_csv = f"{batch_dir}/customers.csv"

df_customers = spark.read.format('csv').options(header='true', inferSchema='true').load(customers_csv)
display(df_customers)

customer_key,store_key,first_name,last_name,email,address_key,address,district,city_key,postal_code,phone
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1913 Hanoi Way,Nagasaki,463,35200,28303384290
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1121 Loja Avenue,California,449,17886,838635286649
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,692 Joliet Street,Attika,38,83579,448477190408
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1566 Inegl Manor,Mandalay,349,53561,705814003527
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,53 Idfu Parkway,Nantou,361,42399,10655648674
6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,10,1795 Santiago de Compostela Way,Texas,295,18743,860452626434
7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,11,900 Santiago de Compostela Parkway,Central Serbia,280,93896,716571220373
8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,12,478 Joliet Way,Hamilton,200,77948,657282285970
9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,13,613 Korolev Drive,Masqat,329,45844,380657522649
10,1,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,14,1531 Sal Drive,Esfahan,162,53628,648856936185


In [0]:
df_customers.printSchema()

root
 |-- customer_key: integer (nullable = true)
 |-- store_key: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address_key: integer (nullable = true)
 |-- address: string (nullable = true)
 |-- district: string (nullable = true)
 |-- city_key: integer (nullable = true)
 |-- postal_code: integer (nullable = true)
 |-- phone: long (nullable = true)



In [0]:
df_customers.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customers")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customers;

col_name,data_type,comment
customer_key,int,
store_key,int,
first_name,string,
last_name,string,
email,string,
address_key,int,
address,string,
district,string,
city_key,int,
postal_code,int,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customers LIMIT 5;

customer_key,store_key,first_name,last_name,email,address_key,address,district,city_key,postal_code,phone
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1913 Hanoi Way,Nagasaki,463,35200,28303384290
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1121 Loja Avenue,California,449,17886,838635286649
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,692 Joliet Street,Attika,38,83579,448477190408
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1566 Inegl Manor,Mandalay,349,53561,705814003527
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,53 Idfu Parkway,Nantou,361,42399,10655648674


Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customers,False
sakila_dlh,dim_date,False
sakila_dlh,dim_inventory,False
sakila_dlh,dim_staff,False
,view_date,True
,view_staff,True


Integrate Reference Data with Real-Time Data; Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data

Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "rental_key BIGINT")
 .option("cloudFiles.schemaHints", "inventory_key BIGINT")
 .option("cloudFiles.schemaHints", "customer_key BIGINT")
 .option("cloudFiles.schemaHints", "staff_key BIGINT") 
 .option("cloudFiles.schemaHints", "payment_key BIGINT")
 .option("cloudFiles.schemaHints", "film_key BIGINT")
 .option("cloudFiles.schemaHints", "store_date_key BIGINT")
 .option("cloudFiles.schemaHints", "title STRING")
 .option("cloudFiles.schemaHints", "payment_amount DECIMAL") 
 .option("cloudFiles.schemaHints", "rental_rate DECIMAL")
 .option("cloudFiles.schemaHints", "replacement_cost DECIMAL")
 .option("cloudFiles.schemaHints", "customer_first_name STRING")
 .option("cloudFiles.schemaHints", "customer_last_name STRING")
 .option("cloudFiles.schemaHints", "rental_duration BIGINT")
 .option("cloudFiles.schemaHints", "rental_date_key BIGINT")
 .option("cloudFiles.schemaHints", "return_date_key BIGINT")
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rentals_stream_dir)
 .createOrReplaceTempView("rentals_raw_tempview"))

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

customer_first_name,customer_key,customer_last_name,film_key,inventory_key,payment_amount,payment_key,rental_date_key,rental_duration,rental_key,rental_rate,replacement_cost,return_date_key,staff_key,store_key,title,_rescued_data,receipt_time,source_file
BECKY,287,MILES,134,618,9.99,7766,20050618,4,2286,4.99,21.99,20050627,1,2,CHAMPION FLATLINERS,,2023-04-25T19:54:43.400+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
STELLA,247,MORENO,193,877,5.99,6680,20050618,5,2288,2.99,20.99,20050626,2,2,CROSSROADS CASUALTIES,,2023-04-25T19:54:43.400+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
DUANE,513,TUBBS,44,200,7.99,13809,20050618,5,2290,4.99,21.99,20050626,1,2,ATTACKS HATE,,2023-04-25T19:54:43.400+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
CHARLOTTE,130,HUNTER,37,173,2.99,3509,20050618,3,2292,2.99,28.99,20050620,2,2,ARIZONA BANG,,2023-04-25T19:54:43.400+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
PRISCILLA,273,LOWE,191,869,1.99,7372,20050618,6,2301,0.99,27.99,20050625,2,1,CROOKED FROGMEN,,2023-04-25T19:54:43.400+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
VALERIE,149,BLACK,135,622,5.99,4063,20050618,3,2305,2.99,22.99,20050624,2,2,CHANCE RESURRECTION,,2023-04-25T19:54:43.400+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
DAN,477,PAINE,46,208,10.99,12888,20050618,3,2306,4.99,13.99,20050627,2,2,AUTUMN CROW,,2023-04-25T19:54:43.400+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
MARY,1,SMITH,44,197,4.99,7,20050618,5,2308,4.99,21.99,20050622,2,1,ATTACKS HATE,,2023-04-25T19:54:43.400+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
MORRIS,576,MCCARTER,132,611,0.99,15427,20050618,6,2309,0.99,25.99,20050620,1,2,CHAINSAW UPTOWN,,2023-04-25T19:54:43.400+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
VIVIAN,184,RUIZ,89,402,0.99,5001,20050618,7,2312,0.99,22.99,20050624,2,2,BORROWERS BEDAZZLED,,2023-04-25T19:54:43.400+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

Out[26]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fadc41eb4f0>

Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

customer_first_name,customer_key,customer_last_name,film_key,inventory_key,payment_amount,payment_key,rental_date_key,rental_duration,rental_key,rental_rate,replacement_cost,return_date_key,staff_key,store_key,title,_rescued_data,receipt_time,source_file
BECKY,287,MILES,134,618,9.99,7766,20050618,4,2286,4.99,21.99,20050627,1,2,CHAMPION FLATLINERS,,2023-04-26T01:39:42.164+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
STELLA,247,MORENO,193,877,5.99,6680,20050618,5,2288,2.99,20.99,20050626,2,2,CROSSROADS CASUALTIES,,2023-04-26T01:39:42.164+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
DUANE,513,TUBBS,44,200,7.99,13809,20050618,5,2290,4.99,21.99,20050626,1,2,ATTACKS HATE,,2023-04-26T01:39:42.164+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
CHARLOTTE,130,HUNTER,37,173,2.99,3509,20050618,3,2292,2.99,28.99,20050620,2,2,ARIZONA BANG,,2023-04-26T01:39:42.164+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
PRISCILLA,273,LOWE,191,869,1.99,7372,20050618,6,2301,0.99,27.99,20050625,2,1,CROOKED FROGMEN,,2023-04-26T01:39:42.164+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
VALERIE,149,BLACK,135,622,5.99,4063,20050618,3,2305,2.99,22.99,20050624,2,2,CHANCE RESURRECTION,,2023-04-26T01:39:42.164+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
DAN,477,PAINE,46,208,10.99,12888,20050618,3,2306,4.99,13.99,20050627,2,2,AUTUMN CROW,,2023-04-26T01:39:42.164+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
MARY,1,SMITH,44,197,4.99,7,20050618,5,2308,4.99,21.99,20050622,2,1,ATTACKS HATE,,2023-04-26T01:39:42.164+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
MORRIS,576,MCCARTER,132,611,0.99,15427,20050618,6,2309,0.99,25.99,20050620,1,2,CHAINSAW UPTOWN,,2023-04-26T01:39:42.164+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json
VIVIAN,184,RUIZ,89,402,0.99,5001,20050618,7,2312,0.99,22.99,20050624,2,2,BORROWERS BEDAZZLED,,2023-04-26T01:39:42.164+0000,dbfs:/FileStore/ds2002-final/stream/fact_rental_3.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
customer_first_name,string,
customer_key,bigint,
customer_last_name,string,
film_key,bigint,
inventory_key,bigint,
payment_amount,double,
payment_key,bigint,
rental_date_key,bigint,
rental_duration,bigint,
rental_key,bigint,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT r.rental_key
    , r.inventory_key
    , r.customer_key
    , r.staff_key
    , r.payment_key
    , r.film_key
    , r.store_key
    , r.payment_amount
    , r.replacement_cost

    , i.title AS film_title
    , i.rental_duration
    , i.rental_rate

    , c.first_name AS customer_first_name
    , c.last_name AS customer_last_name
    , c.email AS customer_email
    , c.phone AS customer_phone_number

    , s.first_name AS employee_first_name
    , s.last_name AS employee_last_name

    , r.rental_date_key
    , rtl.day_name_of_week AS rental_day_name_of_week
    , rtl.day_of_month AS rental_day_of_month
    , rtl.weekday_weekend AS rental_weekday_weekend
    , rtl.month_name AS rental_month_name
    , rtl.calendar_quarter AS rental_quarter
    , rtl.calendar_year AS renta_year

    , r.return_date_key
    , rtn.day_name_of_week AS return_day_name_of_week
    , rtn.day_of_month AS return_day_of_month
    , rtn.weekday_weekend AS return_weekday_weekend
    , rtn.month_name AS return_month_name
    , rtn.calendar_quarter AS return_quarter
    , rtn.calendar_year AS return_year


FROM rentals_silver_tempview AS r
INNER JOIN sakila_dlh.dim_inventory AS i
ON i.inventory_key = r.inventory_key
INNER JOIN sakila_dlh.dim_customers AS c
ON c.customer_key = r.customer_key
INNER JOIN sakila_dlh.dim_staff AS s
ON s.staff_key = r.staff_key

LEFT OUTER JOIN sakila_dlh.dim_date AS rtl
ON rtl.date_key = r.rental_date_key
LEFT OUTER JOIN sakila_dlh.dim_date AS rtn
ON rtn.date_key = r.return_date_key
)


In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

Out[31]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fadc41238b0>

In [0]:
%sql
SELECT * FROM fact_rentals_silver
LIMIT 10

rental_key,inventory_key,customer_key,staff_key,payment_key,film_key,store_key,payment_amount,replacement_cost,film_title,rental_duration,rental_rate,customer_first_name,customer_last_name,customer_email,customer_phone_number,employee_first_name,employee_last_name,rental_date_key,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_quarter,renta_year,return_date_key,return_day_name_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_quarter,return_year


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rentals_silver

col_name,data_type,comment
rental_key,bigint,
inventory_key,bigint,
customer_key,bigint,
staff_key,bigint,
payment_key,bigint,
film_key,bigint,
store_key,bigint,
payment_amount,double,
replacement_cost,double,
film_title,string,


Gold Table: Perform Aggregations

In [0]:
%sql
SELECT c.last_name
  , ROUND(AVG(r.payment_amount), 3) AS avg_payment_amount
  FROM sakila_dlh.`fact_rentals_silver` AS r
  INNER JOIN sakila_dlh.dim_customers AS c
  ON r.customer_key = c.customer_key
  INNER JOIN sakila_dlh.dim_staff AS s
  ON r.staff_key = s.staff_key
  WHERE s.staff_key = '1'
  GROUP BY c.last_name


last_name,avg_payment_amount
HUNTER,3.49
LITTLE,4.99
ELLINGTON,0.99
BOX,3.99
REYNOLDS,2.99
RUSSELL,4.99
SAUER,3.99
PARKER,9.99
HART,4.99
COUGHLIN,4.99


Clean up the File System

In [0]:
%fs rm -r /FileStore/ds2002-final/