## Final Capstone:  Sakila
The spirit of the project is to provide a capstone challenge that requires students to demonstrate a practical and functional understanding of each of the data systems and architectural principles covered throughout the session.

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "cre3nue-mysql2.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_etl"

connection_properties = {
  "user" : "cre3nue",
  "password" : "SequelSeashell88!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "ds2002cluster.pohbzdx"
atlas_database_name = "sakila_etl"
atlas_user_name = "cre3nue"
atlas_password = "LeafLetAndLetLive432"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/FinalProjectSakila"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

value_stream_dir = f"{stream_dir}/fact_value"

value_output_bronze = f"{database_dir}/fact_value/bronze"
value_output_silver = f"{database_dir}/fact_value/silver"
value_output_gold   = f"{database_dir}/fact_value/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_value", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[42]: True

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Sakila Capstone Database"
LOCATION "dbfs:/FileStore/FinalProjectSakila/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://cre3nue-mysql2.mysql.database.azure.com:3306/sakila_etl",
  dbtable "dim_date",
  user "cre3nue",
  password "SequelSeashell88!"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/FinalProjectSakila/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Film Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_film" that extracts data from your MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_film
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://cre3nue-mysql2.mysql.database.azure.com:3306/sakila_etl",
  dbtable "dim_film",
  user "cre3nue",
  password "SequelSeashell88!"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_film" using data from the view named "view_film"
CREATE OR REPLACE TABLE sakila_dlh.dim_film
COMMENT "Film Dimension Table"
LOCATION "dbfs:/FileStore/FinalProjectSakila/sakila_dlh/dim_film"
AS SELECT * FROM view_film

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film;

col_name,data_type,comment
film_key,bigint,
title,string,
release_year,bigint,
language_id,bigint,
rental_duration,bigint,
rental_rate,double,
length,bigint,
replacement_cost,double,
rating,string,
special_features,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5

film_key,title,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
1,ACADEMY DINOSAUR,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
2,ACE GOLDFINGER,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"
3,ADAPTATION HOLES,2006,1,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes"
4,AFFAIR PREJUDICE,2006,1,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes"
5,AFRICAN EGG,2006,1,6,2.99,130,22.99,G,Deleted Scenes


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
# if nothing shows up!  go to the ds2002-lab06 folder in DBFS, right click white space, "Upload Here", drag source_data folder from DS2002-002-main zip

display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/FinalProjectSakila/source_data/batch/SakilaDimFilm.json,SakilaDimFilm.json,273689,1683852831000
dbfs:/FileStore/FinalProjectSakila/source_data/batch/SakilaDimInventory.json,SakilaDimInventory.json,69401,1683852831000
dbfs:/FileStore/FinalProjectSakila/source_data/batch/SakilaDimStaff.csv,SakilaDimStaff.csv,164,1683932428000
dbfs:/FileStore/FinalProjectSakila/source_data/batch/SakilaDimStaff.json,SakilaDimStaff.json,353,1683852831000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/FinalProjectSakila/source_data/batch'
json_files = {"film" : 'SakilaDimFilm.json', "inventory" : 'SakilaDimInventory.json', "staff" : 'SakilaDimStaff.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[55]: <pymongo.results.InsertManyResult at 0x7f0a10fc7b80>

##### 2.3.1. Fetch Film Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_film = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("uri",  f"mongodb+srv://cre3nue:LeafLetAndLetLive432@ds2002cluster.pohbzdx.mongodb.net/sakila_etl")
.option("database", "sakila_etl").option("collection", "film").load()
.select("film_key","title","release_year","language_id","rental_duration","rental_rate","length","replacement_cost","rating","special_features")

display(df_film)

film_key,title,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
1,ACADEMY DINOSAUR,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
2,ACE GOLDFINGER,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"
3,ADAPTATION HOLES,2006,1,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes"
4,AFFAIR PREJUDICE,2006,1,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes"
5,AFRICAN EGG,2006,1,6,2.99,130,22.99,G,Deleted Scenes
6,AGENT TRUMAN,2006,1,3,2.99,169,17.99,PG,Deleted Scenes
7,AIRPLANE SIERRA,2006,1,6,4.99,62,28.99,PG-13,"Trailers,Deleted Scenes"
8,AIRPORT POLLOCK,2006,1,6,4.99,54,15.99,R,Trailers
9,ALABAMA DEVIL,2006,1,3,2.99,114,21.99,PG-13,"Trailers,Deleted Scenes"
10,ALADDIN CALENDAR,2006,1,6,4.99,63,24.99,NC-17,"Trailers,Deleted Scenes"


In [0]:
%scala
df_film.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Film Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_film.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_film")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_film

col_name,data_type,comment
film_key,bigint,
title,string,
release_year,bigint,
language_id,bigint,
rental_duration,bigint,
rental_rate,double,
length,bigint,
replacement_cost,double,
rating,string,
special_features,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_film LIMIT 5

film_key,title,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
1,ACADEMY DINOSAUR,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
2,ACE GOLDFINGER,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"
3,ADAPTATION HOLES,2006,1,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes"
4,AFFAIR PREJUDICE,2006,1,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes"
5,AFRICAN EGG,2006,1,6,2.99,130,22.99,G,Deleted Scenes


##### 2.4.1 Fetch Inventory Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_inventory = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("uri",  f"mongodb+srv://cre3nue:LeafLetAndLetLive432@ds2002cluster.pohbzdx.mongodb.net/sakila_etl")
.option("database", "sakila_etl").option("collection", "inventory").load()
.select("inventory_key","film_key","store_key")

display(df_inventory)

inventory_key,film_key,store_key
1,1,1
2,1,1
3,1,1
4,1,1
5,1,2
6,1,2
7,1,2
8,1,2
9,2,2
10,2,2


In [0]:
%scala
df_inventory.printSchema()

##### 2.4.2. Use the Spark DataFrame to Create a New Inventory Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_inventory.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_inventory")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory

col_name,data_type,comment
inventory_key,int,
film_key,int,
store_key,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_inventory,
Type,MANAGED,
Location,dbfs:/FileStore/FinalProjectSakila/sakila_dlh/dim_inventory,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5

inventory_key,film_key,store_key
1,1,1
2,1,1
3,1,1
4,1,1
5,1,2


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
staff_csv = f"{batch_dir}/SakilaDimStaff.csv"

df_staff = spark.read.format('csv').options(header='true', inferSchema='true').load(staff_csv)
display(df_staff)

staff_key,first_name,last_name,address_id,email,store_id,active
1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1
2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1


In [0]:
df_staff.printSchema()

root
 |-- staff_key: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- email: string (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- active: integer (nullable = true)



In [0]:
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff;

col_name,data_type,comment
staff_key,int,
first_name,string,
last_name,string,
address_id,int,
email,string,
store_id,int,
active,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 5;

staff_key,first_name,last_name,address_id,email,store_id,active
1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1
2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_date,False
sakila_dlh,dim_film,False
sakila_dlh,dim_inventory,False
sakila_dlh,dim_staff,False
,display_query_10,True
,display_query_11,True
,fact_value_silver_tempview,True
,value_bronze_tempview,True
,value_raw_tempview,True
,value_silver_tempview,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Value Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_value_key BIGINT")
 .option("cloudFiles.schemaHints", "film_key BIGINT")
 .option("cloudFiles.schemaHints", "inventory_key BIGINT")
 .option("cloudFiles.schemaHints", "rental_key BIGINT")
 .option("cloudFiles.schemaHints", "store_key BIGINT")
 .option("cloudFiles.schemaHints", "title STRING")
 .option("cloudFiles.schemaHints", "release_year BIGINT")
 .option("cloudFiles.schemaHints", "rating STRING")
 .option("cloudFiles.schemaHints", "language_id BIGINT")
 .option("cloudFiles.schemaHints", "rental_rate DOUBLE")
 .option("cloudFiles.schemaHints", "amount DOUBLE")
 .option("cloudFiles.schemaHints", "replacement_cost DOUBLE")
 .option("cloudFiles.schemaHints", "rental_date_key BIGINT")
 .option("cloudFiles.schemaHints", "payment_date_key BIGINT")
 .option("cloudFiles.schemaHints", "return_date_key BIGINT")
 .option("cloudFiles.schemaHints", "last_update STRING")
 .option("cloudFiles.schemaLocation", value_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(value_stream_dir)
 .createOrReplaceTempView("value_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW value_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM value_raw_tempview
)

In [0]:
%sql
SELECT * FROM value_bronze_tempview

amount,fact_value_key,film_key,inventory_key,language_id,last_update,payment_date_key,rating,release_year,rental_date_key,rental_key,rental_rate,replacement_cost,return_date_key,store_key,title,_rescued_data,receipt_time,source_file
2.99,2,83,377,1,2006-02-15 21:30:53,20050528,G,2006,20050528,591,2.99,18.99,20050529,2,BLUES INSTINCT,,2023-05-12T23:48:40.834+0000,dbfs:/FileStore/FinalProjectSakila/source_data/stream/fact_value/SakilaFactValue02.json
2.99,1,164,751,1,2006-02-15 21:30:53,20050527,PG,2006,20050527,337,0.99,20.99,20050602,2,COAST RAINBOW,,2023-05-12T23:48:40.834+0000,dbfs:/FileStore/FinalProjectSakila/source_data/stream/fact_value/SakilaFactValue01.json
6.99,3,86,390,1,2006-02-15 21:30:53,20050530,R,2006,20050530,987,4.99,11.99,20050607,1,BOOGIE AMELIE,,2023-05-12T23:48:40.834+0000,dbfs:/FileStore/FinalProjectSakila/source_data/stream/fact_value/SakilaFactValue03.json


In [0]:
(spark.table("value_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{value_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_value_bronze"))

Out[69]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f0a1928dac0>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_value_bronze")
  .createOrReplaceTempView("value_silver_tempview"))

In [0]:
%sql
SELECT * FROM value_silver_tempview

amount,fact_value_key,film_key,inventory_key,language_id,last_update,payment_date_key,rating,release_year,rental_date_key,rental_key,rental_rate,replacement_cost,return_date_key,store_key,title,_rescued_data,receipt_time,source_file
2.99,2,83,377,1,2006-02-15 21:30:53,20050528,G,2006,20050528,591,2.99,18.99,20050529,2,BLUES INSTINCT,,2023-05-12T23:48:42.522+0000,dbfs:/FileStore/FinalProjectSakila/source_data/stream/fact_value/SakilaFactValue02.json
2.99,1,164,751,1,2006-02-15 21:30:53,20050527,PG,2006,20050527,337,0.99,20.99,20050602,2,COAST RAINBOW,,2023-05-12T23:48:42.522+0000,dbfs:/FileStore/FinalProjectSakila/source_data/stream/fact_value/SakilaFactValue01.json
6.99,3,86,390,1,2006-02-15 21:30:53,20050530,R,2006,20050530,987,4.99,11.99,20050607,1,BOOGIE AMELIE,,2023-05-12T23:48:42.522+0000,dbfs:/FileStore/FinalProjectSakila/source_data/stream/fact_value/SakilaFactValue03.json


In [0]:
%sql
DESCRIBE EXTENDED value_silver_tempview

col_name,data_type,comment
amount,double,
fact_value_key,bigint,
film_key,bigint,
inventory_key,bigint,
language_id,bigint,
last_update,string,
payment_date_key,bigint,
rating,string,
release_year,bigint,
rental_date_key,bigint,


In [0]:
%sql
USE sakila_dlh;
-- fact value:  amount, fact_value_key, film_key, inventory_key, language_id, last_update, payment_date_key, rating, release_year, rental_date_key, rental_key, rental_rate, replacement_cost, return_date_key, store_key, title
-- dim staff:  staff_key, first_name, last_name, address_id, email, store_id, active
-- dim inventory:  inventory_key, store_key, film_key
-- dim film:  film_key, title, release_year, language_id, rental_duration, rental_rate, length, replacement_cost, rating, special_features
CREATE OR REPLACE TEMPORARY VIEW fact_value_silver_tempview AS (
  SELECT v.fact_value_key,
      s.staff_key,
      s.last_name AS employee_last_name,
      s.first_name AS employee_first_name,
      s.email AS employee_email,
      v.inventory_key,
      i.store_key,
      v.film_key,
      f.special_features,
      f.length AS film_length,
      f.rental_duration,
      v.payment_date_key,
      pd.day_name_of_week AS paid_day_name_of_week,
      pd.day_of_month AS paid_day_of_month,
      pd.weekday_weekend AS paid_weekday_weekend,
      pd.month_name AS paid_month_name,
      pd.calendar_quarter AS paid_calendar_quarter,
      pd.calendar_year AS paid_calendar_year,
      v.rental_date_key,
      rend.day_name_of_week AS rental_day_name_of_week,
      rend.day_of_month AS rental_day_of_month,
      rend.weekday_weekend AS rental_weekday_weekend,
      rend.month_name AS rental_month_name,
      rend.calendar_quarter AS rental_calendar_quarter,
      rend.calendar_year AS rental_calendar_year,
      v.return_date_key,
      retd.day_name_of_week AS return_day_name_of_week,
      retd.day_of_month AS return_day_of_month,
      retd.weekday_weekend AS return_weekday_weekend,
      retd.month_name AS return_month_name,
      retd.calendar_quarter AS return_calendar_quarter,
      retd.calendar_year AS return_calendar_year,
      v.amount,
      v.language_id,
      v.rating,
      v.release_year,
      v.rental_rate,
      v.replacement_cost,
      v.title
  FROM value_silver_tempview AS v
  INNER JOIN sakila_dlh.dim_staff AS s
  ON s.store_id = v.store_key
  INNER JOIN sakila_dlh.dim_inventory AS i
  ON i.inventory_key = v.inventory_key
  INNER JOIN sakila_dlh.dim_film AS f
  ON f.film_key = v.film_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS pd
  ON pd.date_key = v.payment_date_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS rend
  ON rend.date_key = v.rental_date_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS retd
  ON retd.date_key = v.return_date_key
)

In [0]:
(spark.table("fact_value_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{value_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_value_silver"))

Out[74]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f0a1928d970>

In [0]:
%sql
SELECT * FROM fact_value_silver

fact_value_key,staff_key,employee_last_name,employee_first_name,employee_email,inventory_key,store_key,film_key,special_features,film_length,rental_duration,payment_date_key,paid_day_name_of_week,paid_day_of_month,paid_weekday_weekend,paid_month_name,paid_calendar_quarter,paid_calendar_year,rental_date_key,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_calendar_quarter,rental_calendar_year,return_date_key,return_day_name_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_calendar_quarter,return_calendar_year,amount,language_id,rating,release_year,rental_rate,replacement_cost,title
1,2,Stephens,Jon,Jon.Stephens@sakilastaff.com,751,2,164,"Trailers,Commentaries,Deleted Scenes,Behind the Scenes",55,4,20050527,Friday,27,Weekday,May,2,2005,20050527,Friday,27,Weekday,May,2,2005,20050602,Thursday,2,Weekday,June,2,2005,2.99,1,PG,2006,0.99,20.99,COAST RAINBOW
2,2,Stephens,Jon,Jon.Stephens@sakilastaff.com,377,2,83,"Trailers,Deleted Scenes,Behind the Scenes",50,5,20050528,Saturday,28,Weekend,May,2,2005,20050528,Saturday,28,Weekend,May,2,2005,20050529,Sunday,29,Weekend,May,2,2005,2.99,1,G,2006,2.99,18.99,BLUES INSTINCT
3,1,Hillyer,Mike,Mike.Hillyer@sakilastaff.com,390,1,86,"Commentaries,Behind the Scenes",121,6,20050530,Monday,30,Weekday,May,2,2005,20050530,Monday,30,Weekday,May,2,2005,20050607,Tuesday,7,Weekday,June,2,2005,6.99,1,R,2006,4.99,11.99,BOOGIE AMELIE


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_value_silver

col_name,data_type,comment
fact_value_key,bigint,
staff_key,int,
employee_last_name,string,
employee_first_name,string,
employee_email,string,
inventory_key,bigint,
store_key,int,
film_key,bigint,
special_features,string,
film_length,bigint,


##### 6.3. Gold Table: Perform Aggregations

In [0]:
%sql
SELECT staff_key AS EmployeeID
  , employee_last_name AS LastName
  , employee_first_name AS FirstName
  , rental_weekday_weekend AS RentalDay
  , SUM(amount) AS TotalTransactionPrice
FROM sakila_dlh.fact_value_silver
GROUP BY EmployeeID, LastName, FirstName, RentalDay
ORDER BY TotalTransactionPrice DESC

EmployeeID,LastName,FirstName,RentalDay,TotalTransactionPrice
1,Hillyer,Mike,Weekday,6.99
2,Stephens,Jon,Weekday,2.99
2,Stephens,Jon,Weekend,2.99


#### 9.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/FinalProjectSakila