## DS2002 Project 2 - Helena Moore
Dimension tables: dim_film (film information), dim_customer (customer information), dim_date (date dimension which allows the analysis of business processes over time, namely in the fact_rental table), dim_store (store information)

Fact table: fact_rental (fact table which combines the dimension tables, shows the rental business process)

#### Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ds2002-mysql-jsk9gb.mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "jsk9gb",
  "password" : "pinkMango33",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.mrqfcb7"
atlas_database_name = "sakila_dw"
atlas_user_name = "jsk9gb"
atlas_password = "pinkMango33"


# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/final_project"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/project_data" #source data 
batch_dir = f"{data_dir}/batch" # batch 
stream_dir = f"{data_dir}/stream" #stream

# rental stream 
rental_stream_dir1 = f"{stream_dir}"

rental_output_bronze1 = f"{database_dir}/fact_rental1/bronze"
rental_output_silver1 = f"{database_dir}/fact_rental1/silver"
rental_output_gold1   = f"{database_dir}/fact_rental1/gold"


# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rental1", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### Fetch Reference Data From an Azure MySQL Database
##### Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Project 2"
--FIX THIS LOCATION CHANGE FILE PATH
LOCATION "dbfs:/FileStore/final_project/sakila_dlh"
--CHANGE PURPOSE TO THE RIGHT NAME TOO 
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Project 2");

##### Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-jsk9gb.mysql.database.azure.com:3306/sakila_dw", --Replace with your Server Name
  dbtable "dim_date",
  user "jsk9gb",    --Replace with your User Name
  password "pinkMango33"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"

LOCATION "dbfs:/FileStore/final_project/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,varchar(65535),
date_name,varchar(65535),
date_name_us,varchar(65535),
date_name_eu,varchar(65535),
day_of_week,int,
day_name_of_week,varchar(65535),
day_of_month,int,
day_of_year,int,
weekday_weekend,varchar(65535),


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### Create a New Table that Sources Customer Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_customer" that extracts data from your MySQL database.

CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-jsk9gb.mysql.database.azure.com:3306/sakila_dw", 
  dbtable "dim_customer",
  user "jsk9gb",    
  password "pinkMango33"  
)


In [0]:
%sql
-- Create a new table named "sakila_dlh.dim_customer" using data from the view named "view_customer"

USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_customer
COMMENT "Film Dimension Table"
LOCATION "dbfs:/FileStore/final_project/sakila_dlh/dim_customer"
AS SELECT * FROM view_customer


num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
customer_key,varchar(65535),
store_id,varchar(65535),
first_name,varchar(65535),
last_name,varchar(65535),
active,varchar(65535),
create_date,varchar(65535),
,,
# Delta Statistics Columns,,
Column Names,"first_name, store_id, create_date, last_name, customer_key, active",
Column Selection Method,first-32,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_key,store_id,first_name,last_name,active,create_date
1,1,MARY,SMITH,1,2006-02-14 22:04:36
2,1,PATRICIA,JOHNSON,1,2006-02-14 22:04:36
3,1,LINDA,WILLIAMS,1,2006-02-14 22:04:36
4,2,BARBARA,JONES,1,2006-02-14 22:04:36
5,1,ELIZABETH,BROWN,1,2006-02-14 22:04:36


#### Fetch Reference Data from a MongoDB Atlas Database
##### View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir)) # '/dbfs/FileStore/final_project/project_data/batch'

path,name,size,modificationTime
dbfs:/FileStore/final_project/project_data/batch/sakiladimcustomer.json,sakiladimcustomer.json,81877,1715297309000
dbfs:/FileStore/final_project/project_data/batch/sakiladimdate.json,sakiladimdate.json,2286359,1715297314000
dbfs:/FileStore/final_project/project_data/batch/sakiladimfilm.json,sakiladimfilm.json,333387,1715297327000
dbfs:/FileStore/final_project/project_data/batch/sakiladimstore.csv,sakiladimstore.csv,174,1715297300000


##### Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection

In [0]:

source_dir = '/dbfs/FileStore/final_project/project_data/batch'

json_files = {"films" : 'sakiladimfilm.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7fc050c07d00>

##### Fetch Film Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "jsk9gb"
val pwd = "pinkMango33"
val clusterName = "cluster0.mrqfcb7"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_films = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_dw")

.option("collection", "films").load()

.select("film_key","title","description","release_year","rental_duration","rental_rate","replacement_cost","rating","special_features","category")

display(df_films)

film_key,title,description,release_year,rental_duration,rental_rate,replacement_cost,rating,special_features,category
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,6,0.99,20.99,PG,"Deleted Scenes,Behind the Scenes",Documentary
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,3,4.99,12.99,G,"Trailers,Deleted Scenes",Horror
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,7,2.99,18.99,NC-17,"Trailers,Deleted Scenes",Documentary
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,5,2.99,26.99,G,"Commentaries,Behind the Scenes",Horror
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,6,2.99,22.99,G,Deleted Scenes,Family
6,AGENT TRUMAN,A Intrepid Panorama of a Robot And a Boy who must Escape a Sumo Wrestler in Ancient China,2006,3,2.99,17.99,PG,Deleted Scenes,Foreign
7,AIRPLANE SIERRA,A Touching Saga of a Hunter And a Butler who must Discover a Butler in A Jet Boat,2006,6,4.99,28.99,PG-13,"Trailers,Deleted Scenes",Comedy
8,AIRPORT POLLOCK,A Epic Tale of a Moose And a Girl who must Confront a Monkey in Ancient India,2006,6,4.99,15.99,R,Trailers,Horror
9,ALABAMA DEVIL,A Thoughtful Panorama of a Database Administrator And a Mad Scientist who must Outgun a Mad Scientist in A Jet Boat,2006,3,2.99,21.99,PG-13,"Trailers,Deleted Scenes",Horror
10,ALADDIN CALENDAR,A Action-Packed Tale of a Man And a Lumberjack who must Reach a Feminist in Ancient China,2006,6,4.99,24.99,NC-17,"Trailers,Deleted Scenes",Sports


In [0]:
%scala

df_films.printSchema()

##### Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:

%scala
df_films.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_films")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_films

col_name,data_type,comment
film_key,int,
title,string,
description,string,
release_year,int,
rental_duration,int,
rental_rate,double,
replacement_cost,double,
rating,string,
special_features,string,
category,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_films LIMIT 5

film_key,title,description,release_year,rental_duration,rental_rate,replacement_cost,rating,special_features,category
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,6,0.99,20.99,PG,"Deleted Scenes,Behind the Scenes",Documentary
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,3,4.99,12.99,G,"Trailers,Deleted Scenes",Horror
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,7,2.99,18.99,NC-17,"Trailers,Deleted Scenes",Documentary
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,5,2.99,26.99,G,"Commentaries,Behind the Scenes",Horror
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,6,2.99,22.99,G,Deleted Scenes,Family


#### Fetch Data from a File System
##### Use PySpark to Read From a CSV File


In [0]:
store_csv = f"{batch_dir}/sakiladimstore.csv"

df_stores = spark.read.format('csv').options(header='true', inferSchema='true',delimiter=';',quote='"').load(store_csv)
display(df_stores)

store_key,manager_staff_id,address_id,last_update,store_address
1,1,1,2024-05-09T18:36:05Z,47 MySakila Drive
2,2,2,2024-05-09T18:36:05Z,28 MySQL Boulevard


In [0]:
df_stores.printSchema()

root
 |-- store_key: integer (nullable = true)
 |-- manager_staff_id: integer (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- last_update: timestamp (nullable = true)
 |-- store_address: string (nullable = true)



In [0]:
df_stores.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_store")


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_store;

col_name,data_type,comment
store_key,int,
manager_staff_id,int,
address_id,int,
last_update,timestamp,
store_address,string,
,,
# Delta Statistics Columns,,
Column Names,"address_id, manager_staff_id, last_update, store_address, store_key",
Column Selection Method,first-32,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_store LIMIT 5;

store_key,manager_staff_id,address_id,last_update,store_address
1,1,1,2024-05-09T18:36:05Z,47 MySakila Drive
2,2,2,2024-05-09T18:36:05Z,28 MySQL Boulevard


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_films,False
sakila_dlh,dim_store,False
,view_customer,True
,view_date,True


### Section III: Integrate Reference Data with Real-Time Data
#### Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### Bronze Table: Process 'Raw' JSON Data -- processes 3 json files of data (3 intervals)

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "rental_key BIGINT")
 .option("cloudFiles.schemaHints", "inventory_id BIGINT")
 .option("cloudFiles.schemaHints", "customer_key BIGINT")
 .option("cloudFiles.schemaHints", "return_date DATETIME")
 .option("cloudFiles.schemaHints", "store_key BIGINT")
 .option("cloudFiles.schemaHints", "title TEXT")
 .option("cloudFiles.schemaHints", "rental_date_key BIGINT")
 .option("cloudFiles.schemaLocation", rental_output_bronze1)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rental_stream_dir1)
 .createOrReplaceTempView("rental_raw_tempview1"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rental_bronze_tempview1 AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rental_raw_tempview1
)

In [0]:
%sql
SELECT * FROM rental_bronze_tempview1

customer_key,film_key,inventory_id,rental_date_key,rental_key,return_date,store_key,title,_rescued_data,receipt_time,source_file
130,80,367,20050524,1,2005-05-26 22:04:30,1,BLANKET BEVERLY,,2024-05-10T20:14:03.909Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental1.json
459,333,1525,20050524,2,2005-05-28 19:40:33,2,FREAKY POCUS,,2024-05-10T20:14:03.909Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental1.json
408,373,1711,20050524,3,2005-06-01 22:12:39,2,GRADUATE LORD,,2024-05-10T20:14:03.909Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental1.json
333,535,2452,20050524,4,2005-06-03 01:43:41,1,LOVE SUICIDES,,2024-05-10T20:14:03.909Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental1.json
222,450,2079,20050524,5,2005-06-02 04:33:21,2,IDOLS SNATCHERS,,2024-05-10T20:14:03.909Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental1.json
549,613,2792,20050524,6,2005-05-27 01:32:07,1,MYSTIC TRUMAN,,2024-05-10T20:14:03.909Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental1.json
269,870,3995,20050524,7,2005-05-29 20:34:53,2,SWARM GOLD,,2024-05-10T20:14:03.909Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental1.json
239,510,2346,20050524,8,2005-05-27 23:33:46,1,LAWLESS VISION,,2024-05-10T20:14:03.909Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental1.json
126,565,2580,20050525,9,2005-05-28 00:22:40,1,MATRIX SNOWMAN,,2024-05-10T20:14:03.909Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental1.json
399,396,1824,20050525,10,2005-05-31 22:44:21,2,HANGING DEEP,,2024-05-10T20:14:03.909Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental1.json


In [0]:
(spark.table("rental_bronze_tempview1")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_bronze1}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_bronze1"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7fc0509c0bb0>

##### Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rental_bronze1")
  .createOrReplaceTempView("rental_silver_tempview1"))

In [0]:
%sql
SELECT * FROM rental_silver_tempview1

customer_key,film_key,inventory_id,rental_date_key,rental_key,return_date,store_key,title,_rescued_data,receipt_time,source_file
468,134,617,20050525,101,2005-05-31 19:47:04,2,CHAMPION FLATLINERS,,2024-05-10T20:14:05.069Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental3.json
343,82,373,20050525,102,2005-05-31 19:47:10,1,BLOOD ARGONAUTS,,2024-05-10T20:14:05.069Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental3.json
384,735,3343,20050525,103,2005-06-03 22:36:42,1,ROBBERS JOON,,2024-05-10T20:14:05.069Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental3.json
310,932,4281,20050525,104,2005-05-27 15:20:33,1,VALLEY PACKER,,2024-05-10T20:14:05.069Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental3.json
108,173,794,20050525,105,2005-05-30 12:03:12,2,CONFESSIONS MAGUIRE,,2024-05-10T20:14:05.069Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental3.json
196,791,3627,20050525,106,2005-06-04 00:01:19,2,SHOW LORD,,2024-05-10T20:14:05.069Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental3.json
317,621,2833,20050525,107,2005-06-03 22:46:09,2,NETWORK PEAK,,2024-05-10T20:14:05.069Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental3.json
242,724,3289,20050525,108,2005-05-30 19:40:05,2,REMEMBER DIARY,,2024-05-10T20:14:05.069Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental3.json
503,233,1044,20050525,109,2005-05-29 20:39:20,1,DISCIPLE MOTHER,,2024-05-10T20:14:05.069Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental3.json
19,893,4108,20050525,110,2005-06-03 18:13:49,2,TITANS JERK,,2024-05-10T20:14:05.069Z,dbfs:/FileStore/final_project/project_data/stream/sakilafactrental3.json


In [0]:
%sql
DESCRIBE EXTENDED rental_silver_tempview1

col_name,data_type,comment
customer_key,bigint,
film_key,bigint,
inventory_id,bigint,
rental_date_key,bigint,
rental_key,bigint,
return_date,string,
store_key,bigint,
title,string,
_rescued_data,string,
receipt_time,timestamp,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rental_silver_tempview1 AS (
  SELECT rental.customer_key AS rental_customer_key, 
  rental.film_key AS rental_film_key, 
  rental.inventory_id,
  rental.rental_date_key,
  rental.rental_key,
  rental.return_date,
  rental.store_key AS rental_store_key,
  rental.title,
  store.store_key,
  store.manager_staff_id,
  store.address_id,
  store.last_update,
  store.store_address,
  cust.customer_key,
  cust.store_id,
  cust.last_name,
  cust.active,
  cust.create_date,
  film.film_key,
  film.title AS film_title,
  film.release_year,
  film.rental_duration,
  film.rental_rate,
  film.rating,
  film.category,
  dimdate.date_key,
  dimdate.full_date
  FROM rental_silver_tempview1 AS rental
  LEFT OUTER JOIN sakila_dlh.dim_store AS store
  ON rental.store_key = store.store_key
  LEFT OUTER JOIN sakila_dlh.dim_customer AS cust
  ON rental.customer_key = cust.customer_key
  LEFT OUTER JOIN sakila_dlh.dim_films AS film
  ON rental.film_key = film.film_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS dimdate
  ON rental.rental_date_key = dimdate.date_key
)

In [0]:
(spark.table("fact_rental_silver_tempview1")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_output_silver1}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_silver1"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7fc05011a3b0>

In [0]:
%sql
SELECT * FROM fact_rental_silver1

rental_customer_key,rental_film_key,inventory_id,rental_date_key,rental_key,return_date,rental_store_key,title,store_key,manager_staff_id,address_id,last_update,store_address,customer_key,store_id,last_name,active,create_date,film_key,film_title,release_year,rental_duration,rental_rate,rating,category,date_key,full_date
468,134,617,20050525,101,2005-05-31 19:47:04,2,CHAMPION FLATLINERS,2,2,2,2024-05-09T18:36:05Z,28 MySQL Boulevard,468,1,CARY,1,2006-02-14 22:04:37,134,CHAMPION FLATLINERS,2006,4,4.99,PG,Animation,,
343,82,373,20050525,102,2005-05-31 19:47:10,1,BLOOD ARGONAUTS,1,1,1,2024-05-09T18:36:05Z,47 MySakila Drive,343,1,GRAF,1,2006-02-14 22:04:37,82,BLOOD ARGONAUTS,2006,3,0.99,G,Family,,
384,735,3343,20050525,103,2005-06-03 22:36:42,1,ROBBERS JOON,1,1,1,2024-05-09T18:36:05Z,47 MySakila Drive,384,2,STEPP,1,2006-02-14 22:04:37,735,ROBBERS JOON,2006,7,2.99,PG-13,Children,,
310,932,4281,20050525,104,2005-05-27 15:20:33,1,VALLEY PACKER,1,1,1,2024-05-09T18:36:05Z,47 MySakila Drive,310,2,CABRAL,1,2006-02-14 22:04:37,932,VALLEY PACKER,2006,3,0.99,G,Comedy,,
108,173,794,20050525,105,2005-05-30 12:03:12,2,CONFESSIONS MAGUIRE,2,2,2,2024-05-09T18:36:05Z,28 MySQL Boulevard,108,1,COLE,1,2006-02-14 22:04:36,173,CONFESSIONS MAGUIRE,2006,7,4.99,PG-13,Drama,,
196,791,3627,20050525,106,2005-06-04 00:01:19,2,SHOW LORD,2,2,2,2024-05-09T18:36:05Z,28 MySQL Boulevard,196,1,AUSTIN,1,2006-02-14 22:04:36,791,SHOW LORD,2006,3,4.99,PG-13,Documentary,,
317,621,2833,20050525,107,2005-06-03 22:46:09,2,NETWORK PEAK,2,2,2,2024-05-09T18:36:05Z,28 MySQL Boulevard,317,2,BAUGH,1,2006-02-14 22:04:37,621,NETWORK PEAK,2006,5,2.99,PG-13,Family,,
242,724,3289,20050525,108,2005-05-30 19:40:05,2,REMEMBER DIARY,2,2,2,2024-05-09T18:36:05Z,28 MySQL Boulevard,242,1,FRAZIER,1,2006-02-14 22:04:36,724,REMEMBER DIARY,2006,5,2.99,R,Family,,
503,233,1044,20050525,109,2005-05-29 20:39:20,1,DISCIPLE MOTHER,1,1,1,2024-05-09T18:36:05Z,47 MySakila Drive,503,1,BARCLAY,1,2006-02-14 22:04:37,233,DISCIPLE MOTHER,2006,3,0.99,PG,Travel,,
19,893,4108,20050525,110,2005-06-03 18:13:49,2,TITANS JERK,2,2,2,2024-05-09T18:36:05Z,28 MySQL Boulevard,19,1,MARTINEZ,1,2006-02-14 22:04:36,893,TITANS JERK,2006,4,4.99,PG,Sci-Fi,,


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rental_silver1

col_name,data_type,comment
rental_customer_key,bigint,
rental_film_key,bigint,
inventory_id,bigint,
rental_date_key,bigint,
rental_key,bigint,
return_date,string,
rental_store_key,bigint,
title,string,
store_key,int,
manager_staff_id,int,


##### Gold Table: Perform Aggregations

In [0]:
%sql
CREATE OR REPLACE TABLE sakila_dlh.customer_rentals_by_customer_gold AS (
    SELECT
        fact_rental_silver1.customer_key AS rental_customer_key,
        COUNT(*) AS times_rented,
        MAX(fact_rental_silver1.last_name) AS last_name
    FROM
        sakila_dlh.fact_rental_silver1
    GROUP BY
        fact_rental_silver1.customer_key
    ORDER BY
        times_rented DESC
);

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM sakila_dlh.customer_rentals_by_customer_gold

rental_customer_key,times_rented,last_name
19,3,MARTINEZ
7,2,MILLER
408,2,MURRELL
18,2,GARCIA
302,2,SILVERMAN
207,2,CASTILLO
185,2,HARPER
319,2,WEINER
44,2,TURNER
269,2,WALTERS


#### Clean up the File System

In [0]:
#change path
%fs rm -r /FileStore/final_project/