Import Required Libraries

In [0]:
%pip install pymongo


In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "fwx5ax-azuresqlserver.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila"

connection_properties = {
  "user" : "fwx5ax",
  "password" : "Cluett1!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "Cluster0.tjhne"
atlas_database_name = "sakila-db"
atlas_user_name = "fwx5ax"
atlas_password = "lH1tx43vqHYeRMbW"

# Data Files (JSON) Information ###############################
dst_database = "sakila_db"

base_dir = "dbfs:/FileStore"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/Stream Data"

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

False

Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

Fetch Reference Data From an Azure MySQL Database
Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila
COMMENT "Final Project Sakila Database"
LOCATION "dbfs:/FileStore/sakila"
WITH DBPROPERTIES (contains_pii = true, purpose = "Final Project");

Create a Table to Source Date Dimension Data from a Table in Azure MySQL Database

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://wna8fw-mysql.mysql.database.azure.com:3306/northwind_dw2",
  dbtable "dim_date",
  user "jtupitza",    
  password "Passw0rd123"  
)

In [0]:
%sql
USE DATABASE sakila;

DROP TABLE IF EXISTS sakila.dim_date;

CREATE OR REPLACE TABLE sakila.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/sakila"
AS SELECT * FROM view_date;

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM sakila.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


Create a New Table that Sources Customer Dimension Data from an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_customers
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://fwx5ax-azuresqlserver.mysql.database.azure.com:3306/sakila",
  dbtable "sakila.dim_customer",
  user "fwx5ax",    
  password "Cluett1!"  
)

In [0]:
%sql
USE DATABASE sakila;

CREATE OR REPLACE TABLE sakila.dim_customer
COMMENT "Customers Dimension Table"
LOCATION "dbfs:/FileStore/sakila-db"
AS SELECT * FROM view_customers;

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila.dim_customer;

col_name,data_type,comment
customer_id,int,
store_id,tinyint,
first_name,varchar(45),
last_name,varchar(45),
email,varchar(50),
address_id,int,
create_date,timestamp,
last_update,timestamp,
,,
# Delta Statistics Columns,,


In [0]:
%sql
SELECT * FROM sakila.dim_customer LIMIT 5;

customer_id,store_id,first_name,last_name,email,address_id,create_date,last_update
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,2006-02-14T22:04:36Z,2006-02-15T04:57:20Z


Create a New MongoDB Database and Load JSON Data Into a New MongoDB Collection to Create Dimension Tables

In [0]:
source_dir = '/dbfs/FileStore/sakila-db'
json_files = {"film" : 'DimFilm.json'
              , "inventory" : 'DimInventory.json'
              , "payments" : 'DimPayment.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7ff414b38a40>

In [0]:
%scala
import com.mongodb.spark._

val userName = "fwx5ax"
val pwd = "lH1tx43vqHYeRMbW"
val clusterName = "Cluster0.tjhne"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"


In [0]:
%scala

val df_film = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila-db")
.option("collection", "film").load()
.select("film_id",
"title",
"description",
"release_year",
"language_id",
"rental_duration",
"rental_rate",
"length",
"replacement_cost",
"rating",
"special_features")

display(df_film)

film_id,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,1,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes"
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,1,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes"
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,1,6,2.99,130,22.99,G,Deleted Scenes
6,AGENT TRUMAN,A Intrepid Panorama of a Robot And a Boy who must Escape a Sumo Wrestler in Ancient China,2006,1,3,2.99,169,17.99,PG,Deleted Scenes
7,AIRPLANE SIERRA,A Touching Saga of a Hunter And a Butler who must Discover a Butler in A Jet Boat,2006,1,6,4.99,62,28.99,PG-13,"Trailers,Deleted Scenes"
8,AIRPORT POLLOCK,A Epic Tale of a Moose And a Girl who must Confront a Monkey in Ancient India,2006,1,6,4.99,54,15.99,R,Trailers
9,ALABAMA DEVIL,A Thoughtful Panorama of a Database Administrator And a Mad Scientist who must Outgun a Mad Scientist in A Jet Boat,2006,1,3,2.99,114,21.99,PG-13,"Trailers,Deleted Scenes"
10,ALADDIN CALENDAR,A Action-Packed Tale of a Man And a Lumberjack who must Reach a Feminist in Ancient China,2006,1,6,4.99,63,24.99,NC-17,"Trailers,Deleted Scenes"


Use the Spark DataFrame to Create a New Film Dimension Table in the Databricks Metadata Database (sakila)

In [0]:
%scala
df_film.write.format("delta").mode("overwrite").saveAsTable("sakila.dim_film")

In [0]:
%sql
DESCRIBE EXTENDED sakila.dim_film

col_name,data_type,comment
film_id,int,
title,string,
description,string,
release_year,int,
language_id,int,
rental_duration,int,
rental_rate,double,
length,int,
replacement_cost,double,
rating,string,


In [0]:
%sql
SELECT * FROM sakila.dim_film LIMIT 5

film_id,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,1,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes"
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,1,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes"
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,1,6,2.99,130,22.99,G,Deleted Scenes


Fetch Inventory Data From The New MongoDB Collection

In [0]:
%scala
val df_inventory = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila-db")
.option("collection", "inventory").load()
.select("inventory_id",
"film_id",
"store_id")

display(df_inventory)

inventory_id,film_id,store_id
1,1,1
2,1,1
3,1,1
4,1,1
16,4,1
17,4,1
18,4,1
19,4,1
26,6,1
27,6,1


Use the Spark DataFrame to Create a New Inventory Dimension Table in the Databricks Metadata Database (sakila)

In [0]:
%scala
df_inventory.write.format("delta").mode("overwrite").saveAsTable("sakila.dim_inventory")

In [0]:
%sql
DESCRIBE EXTENDED sakila.dim_inventory

col_name,data_type,comment
inventory_id,int,
film_id,int,
store_id,int,
,,
# Delta Statistics Columns,,
Column Names,"inventory_id, film_id, store_id",
Column Selection Method,first-32,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila.dim_inventory LIMIT 5

inventory_id,film_id,store_id
1,1,1
2,1,1
3,1,1
4,1,1
16,4,1


Fetch Payment Data from a File System (DBFS) and
Use PySpark to Read From a CSV File

In [0]:
payment_csv = "/FileStore/Batch/DimPayment.csv"

df_payment = spark.read.format('csv').options(header='true', inferSchema='true').load(payment_csv)
display(df_payment)

payment_id,customer_id,staff_id,rental_id,amount,payment_date
1,1,1,76,2.99,2005-05-25T11:30:37Z
2,1,1,573,0.99,2005-05-28T10:35:23Z
3,1,1,1185,5.99,2005-06-15T00:54:12Z
4,1,2,1422,0.99,2005-06-15T18:02:53Z
5,1,2,1476,9.99,2005-06-15T21:08:46Z
6,1,1,1725,4.99,2005-06-16T15:18:57Z
7,1,1,2308,4.99,2005-06-18T08:41:48Z
8,1,2,2363,0.99,2005-06-18T13:33:59Z
9,1,1,3284,3.99,2005-06-21T06:24:45Z
10,1,2,4526,5.99,2005-07-08T03:17:05Z


In [0]:
df_payment.printSchema()

Use the Spark DataFrame to Create a New Payment Dimension Table in the Databricks Metadata Database (sakila)

In [0]:
df_payment.write.format("delta").mode("overwrite").saveAsTable("sakila.dim_payment")

In [0]:
%sql
DESCRIBE EXTENDED sakila.dim_payment;

col_name,data_type,comment
payment_id,int,
customer_id,int,
staff_id,int,
rental_id,int,
amount,double,
payment_date,timestamp,
,,
# Delta Statistics Columns,,
Column Names,"customer_id, rental_id, payment_date, amount, payment_id, staff_id",
Column Selection Method,first-32,


In [0]:
%sql
SELECT * FROM sakila.dim_payment LIMIT 5;

payment_id,customer_id,staff_id,rental_id,amount,payment_date
1,1,1,76,2.99,2005-05-25T11:30:37Z
2,1,1,573,0.99,2005-05-28T10:35:23Z
3,1,1,1185,5.99,2005-06-15T00:54:12Z
4,1,2,1422,0.99,2005-06-15T18:02:53Z
5,1,2,1476,9.99,2005-06-15T21:08:46Z


Validate Tables in Database

In [0]:
%sql
USE sakila;
SHOW TABLES

database,tableName,isTemporary
sakila,dim_customer,False
sakila,dim_date,False
sakila,dim_film,False
sakila,dim_inventory,False
sakila,dim_payment,False
,_sqldf,True
,display_query_1,True
,display_query_10,True
,display_query_11,True
,display_query_12,True


Bronze Table: Process Stream Data for Fact Rentals Table

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "rental_id BIGINT NOT NULL")
 .option("cloudFiles.schemaHints", "rental_date BIGINT NOT NULL")
 .option("cloudFiles.schemaHints", "inventory_id BIGINT")
 .option("cloudFiles.schemaHints", "customer_id BIGINT") 
 .option("cloudFiles.schemaHints", "return_date TIMESTAMP")
 .option("cloudFiles.schemaHints", "staff_id BIGINT")

 .option("cloudFiles.schemaLocation", "/FileStore/sakila/bronze_output")
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load("/FileStore/Stream")
 .createOrReplaceTempView("fact_rentals_temp_view"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM fact_rentals_temp_view
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

customer_id,inventory_id,rental_date,rental_id,return_date,staff_id,_rescued_data,receipt_time,source_file
203,4259,2005-08-01 18:26:31,10700,2005-08-07 19:51:31,2,,2024-12-08T23:39:47.681Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
538,3958,2005-08-01 18:28:17,10701,2005-08-09 21:51:17,1,,2024-12-08T23:39:47.681Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
560,2802,2005-08-01 18:34:59,10702,2005-08-09 23:44:59,2,,2024-12-08T23:39:47.681Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
181,1818,2005-08-01 18:37:39,10703,2005-08-07 23:50:39,2,,2024-12-08T23:39:47.681Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
594,960,2005-08-01 18:38:02,10704,2005-08-08 20:19:02,1,,2024-12-08T23:39:47.681Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
381,4338,2005-08-01 18:38:54,10705,2005-08-04 18:00:54,1,,2024-12-08T23:39:47.681Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
147,1183,2005-08-01 18:41:28,10706,2005-08-10 14:30:28,1,,2024-12-08T23:39:47.681Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
558,1165,2005-08-01 18:41:34,10707,2005-08-06 12:41:34,1,,2024-12-08T23:39:47.681Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
567,3978,2005-08-01 18:43:28,10708,2005-08-09 15:24:28,1,,2024-12-08T23:39:47.681Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
418,282,2005-08-01 18:43:57,10709,2005-08-06 13:17:57,2,,2024-12-08T23:39:47.681Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation","FileStore/sakila/bronze_output/checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7ff3d4306b50>

In [0]:
%sql
SELECT COUNT(*) from fact_rentals_bronze;

count(1)
0


Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

customer_id,inventory_id,rental_date,rental_id,return_date,staff_id,_rescued_data,receipt_time,source_file
203,4259,2005-08-01 18:26:31,10700,2005-08-07 19:51:31,2,,2024-12-08T23:39:48.676Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
538,3958,2005-08-01 18:28:17,10701,2005-08-09 21:51:17,1,,2024-12-08T23:39:48.676Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
560,2802,2005-08-01 18:34:59,10702,2005-08-09 23:44:59,2,,2024-12-08T23:39:48.676Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
181,1818,2005-08-01 18:37:39,10703,2005-08-07 23:50:39,2,,2024-12-08T23:39:48.676Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
594,960,2005-08-01 18:38:02,10704,2005-08-08 20:19:02,1,,2024-12-08T23:39:48.676Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
381,4338,2005-08-01 18:38:54,10705,2005-08-04 18:00:54,1,,2024-12-08T23:39:48.676Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
147,1183,2005-08-01 18:41:28,10706,2005-08-10 14:30:28,1,,2024-12-08T23:39:48.676Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
558,1165,2005-08-01 18:41:34,10707,2005-08-06 12:41:34,1,,2024-12-08T23:39:48.676Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
567,3978,2005-08-01 18:43:28,10708,2005-08-09 15:24:28,1,,2024-12-08T23:39:48.676Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json
418,282,2005-08-01 18:43:57,10709,2005-08-06 13:17:57,2,,2024-12-08T23:39:48.676Z,/FileStore/Stream/Stream%20Data%20copy/FactRentals_Table3.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
customer_id,bigint,
inventory_id,bigint,
rental_date,string,
rental_id,bigint,
return_date,string,
staff_id,bigint,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


Create Silver Table

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT r.customer_id,
  r.inventory_id,
  r.rental_date,
  r.rental_id,
  r.return_date,
  p.payment_id,
  p.staff_id,
  p.amount AS payment_amount,
  p.payment_date,
  c.store_id,
  c.first_name AS customer_first_name,
  c.last_name AS customer_last_name,
  c.email AS customer_email,
  c.address_id AS customer_address_id,
  c.create_date AS customer_create_date,
  c.last_update AS customer_last_update,
  f.film_id,
  f.title AS film_title,
  f.description AS film_description,
  f.release_year AS film_release_year,
  f.language_id AS film_language_id,
  f.rental_duration AS film_rental_duration, 
  f.rental_rate AS film_rental_rate,
  f.replacement_cost AS film_replacement_cost,
  f.length AS film_length,
  f.rating AS film_rating,
  f.special_features AS film_special_features
  FROM rentals_silver_tempview AS r
  INNER JOIN sakila.dim_customer AS c
  ON c.customer_id = r.customer_id 
  INNER JOIN sakila.dim_payment as p
  ON p.rental_id = r.rental_id
  INNER JOIN sakila.dim_inventory AS i 
  ON i.inventory_id = r.inventory_id
  INNER JOIN sakila.dim_film AS f
  ON f.film_id = i.film_id
  LEFT OUTER JOIN sakila.dim_date AS dd
  ON dd.date_key = r.return_date)


In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", "FileStore/sakila/silver_output")
      .outputMode("append")
      .table("fact_rentals_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7ff3d4340ad0>

In [0]:
%sql
SELECT * FROM fact_rentals_silver

customer_id,inventory_id,rental_date,rental_id,return_date,payment_id,staff_id,payment_amount,payment_date,store_id,customer_first_name,customer_last_name,customer_email,customer_address_id,customer_create_date,customer_last_update,film_id,film_title,film_description,film_release_year,film_language_id,film_rental_duration,film_rental_rate,film_replacement_cost,film_length,film_rating,film_special_features


Perform Aggregations with Gold Tables

In [0]:
%sql
CREATE OR REPLACE TABLE sakila.fact_rental_payments_by_customer AS (
  SELECT customer_id AS CustomerID
    , customer_last_name AS LastName
    , customer_first_name AS FirstName
    ,SUM(payment_amount) AS total_payment_amount
  FROM sakila.fact_rentals_silver
  GROUP BY CustomerID, LastName, FirstName
  ORDER BY SUM(payment_amount) DESC);

SELECT * FROM sakila.fact_rental_payments_by_customer;

CustomerID,LastName,FirstName,total_payment_amount


In [0]:
%sql

CREATE OR REPLACE TABLE sakila.fact_revenue_by_film AS (
  SELECT film_id AS FilmID 
    , film_title AS FilmTitle
    ,SUM(payment_amount) AS TotalRevenue
  FROM sakila.fact_rentals_silver
  GROUP BY FilmID, FilmTitle
  ORDER BY TotalRevenue DESC);

SELECT * FROM sakila.fact_revenue_by_film;

FilmID,FilmTitle,TotalRevenue
879,TELEGRAPH VOYAGE,231.73
973,WIFE TURN,223.69
1000,ZORRO ARK,214.69000000000003
369,GOODFELLAS SALUTE,209.69
764,SATURDAY LAMBS,204.72
893,TITANS JERK,201.71
897,TORQUE BOUND,198.72
403,HARRY IDAHO,195.7
460,INNOCENT USUAL,191.74
444,HUSTLER PARTY,190.78


In [0]:
%sql

CREATE OR REPLACE TABLE sakila.fact_rental_film_popularity AS (
  SELECT film_id AS FilmID 
    , film_title AS FilmTitle
    ,COUNT(film_title) AS TotalRentals
  FROM sakila.fact_rentals_silver
  GROUP BY FilmID, FilmTitle
  ORDER BY TotalRentals DESC);

SELECT * FROM sakila.fact_rental_film_popularity;

FilmID,FilmTitle,TotalRentals
103,BUCKET BROTHERHOOD,34
738,ROCKETEER MOTHER,33
489,JUGGLER HARDLY,32
767,SCALAWAG DUCK,32
382,GRIT CLOCKWORK,32
730,RIDGEMONT SUBMARINE,32
331,FORWARD TEMPLE,32
418,HOBBIT ALIEN,31
1000,ZORRO ARK,31
621,NETWORK PEAK,31


In [0]:
%sql

CREATE OR REPLACE TABLE sakila.fact_rental_film_rating_revenue AS (
  SELECT
    film_rating AS FilmRating
    ,SUM(payment_amount) AS TotalRevenue
  FROM sakila.fact_rentals_silver
  GROUP BY FilmRating
  ORDER BY TotalRevenue DESC);

SELECT * FROM sakila.fact_rental_film_rating_revenue;

FilmRating,TotalRevenue
PG-13,15259.15999999944
NC-17,13875.069999999503
PG,13337.909999999529
R,13270.189999999531
G,11664.229999999614
