###__Section 1: Prerequisites__

__1.0 Import Required Libraries__

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

__2.0 Instantiate Global Variables__

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "zdc4tp-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "netfluxter_dw"

connection_properties = {
  "user" : "zdc4tp",
  "password" : "Passw0rd789",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.isvb1dn"
atlas_database_name = "netfluxter_dw"
atlas_user_name = "chaizhang777"
atlas_password = "GPa826DjTKvr15"

# Data Files (JSON) Information ###############################
dst_database = "netfluxter_dlh"

base_dir = "dbfs:/FileStore/final_project_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/movie_rental"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rentals_stream_dir = f"{stream_dir}/rentals"

rentals_output_bronze = f"{database_dir}/fact_rentals/bronze"
rentals_output_silver = f"{database_dir}/fact_rentals/silver"
rentals_output_gold   = f"{database_dir}/fact_rentals/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rentals", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

__3.0 Define Global Functions__

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

###__Section 2: Populate Dimensions by Ingesting Reference Data__

__1.0 Fetch Reference Data From an Azure MySQL Database__

1.1 Create a New Databricks Metadata Database

In [0]:
%sql
DROP DATABASE IF EXISTS netfluxter_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS netfluxter_dlh
COMMENT "DS-2002 Final Project Database"
LOCATION "dbfs:/FileStore/final_project_data/netfluxter_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

1.2 Create a New Table that Sources Data Dimension Data from a Table in an Azure MySQL Database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://zdc4tp-mysql.mysql.database.azure.com:3306/netfluxter_dw", --Replace with your Server Name
  dbtable "dim_date",
  user "zdc4tp",    --Replace with your User Name
  password "Passw0rd789"  --Replace with you password
)

In [0]:
%sql
USE DATABASE netfluxter_dlh;

CREATE OR REPLACE TABLE netfluxter_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/final_project_data/netfluxter_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED netfluxter_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,int,
day_name_of_week,varchar(10),
day_of_month,int,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM netfluxter_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


1.3 Create a New Table that Courses Customer Dimension Data from an Azure MySQL Database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_customers
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://zdc4tp-mysql.mysql.database.azure.com:3306/netfluxter_dw", --Replace with your Server Name
  dbtable "dim_customers",
  user "zdc4tp",    --Replace with your User Name
  password "Passw0rd789"  --Replace with you password
)

In [0]:
%sql
USE DATABASE netfluxter_dlh;

CREATE OR REPLACE TABLE netfluxter_dlh.dim_customers
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/final_project_data/netfluxter_dlh/dim_customers"
AS SELECT * FROM view_customers

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED netfluxter_dlh.dim_customers;

col_name,data_type,comment
customer_key,bigint,
customer_id,bigint,
first_name,varchar(65535),
last_name,varchar(65535),
email,varchar(65535),
store_id,bigint,
active,varchar(65535),
,,
# Delta Statistics Columns,,
Column Names,"first_name, customer_id, email, store_id, last_name, customer_key, active",


In [0]:
%sql
SELECT * FROM netfluxter_dlh.dim_customers LIMIT 5

customer_key,customer_id,first_name,last_name,email,store_id,active
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,active
2,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,active
3,3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,1,active
4,4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,2,active
5,5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,1,active


__2.0 Fetch Reference Data from a MongoDB Atlas Database__

2.1 View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/final_project_data/movie_rental/batch'

path,name,size,modificationTime
dbfs:/FileStore/final_project_data/movie_rental/batch/Netfluxter_DimMovies.csv,Netfluxter_DimMovies.csv,389909,1715020426000
dbfs:/FileStore/final_project_data/movie_rental/batch/Netfluxter_DimStores.json,Netfluxter_DimStores.json,394,1715020426000


2.2 Create a New MongoBD Database, and Load JSON Data Into a New MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/final_project_data/movie_rental/batch'
json_files = {"stores" : 'Netfluxter_DimStores.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f4e1c1d0500>

2.3.1 Fetch Store Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "chaizhang777"
val pwd = "GPa826DjTKvr15"
val clusterName = "cluster0.isvb1dn"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_stores = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "netfluxter_dw")
.option("collection", "stores").load()
// address2 is not included because all values are null under this column
.select("store_key", "store_id", "address", "city", "district", "postal_code", "country")

display(df_stores)

store_key,store_id,address,city,district,postal_code,country
1,1,47 MySakila Drive,Lethbridge,Alberta,,Canada
2,2,28 MySQL Boulevard,Woodridge,QLD,,Australia


In [0]:
%scala
df_stores.printSchema()

2.3.2 Use the Spark DataFrame to Create a New Store Dimension Table in the Databricks Metadata Database (netfluxter_dlh)

In [0]:
%scala
df_stores.write.format("delta").mode("overwrite").saveAsTable("netfluxter_dlh.dim_stores")

In [0]:
%sql
DESCRIBE EXTENDED netfluxter_dlh.dim_stores

col_name,data_type,comment
store_key,int,
store_id,int,
address,string,
city,string,
district,string,
postal_code,string,
country,string,
,,
# Delta Statistics Columns,,
Column Names,"city, store_id, country, postal_code, address, store_key, district",


In [0]:
%sql
SELECT * FROM netfluxter_dlh.dim_stores

store_key,store_id,address,city,district,postal_code,country
1,1,47 MySakila Drive,Lethbridge,Alberta,,Canada
2,2,28 MySQL Boulevard,Woodridge,QLD,,Australia


__3.0 Fetch Data from a File System__

3.1 Use PySpark to Read From a CSV File

In [0]:
movies_csv = f"{batch_dir}/Netfluxter_DimMovies.csv"

df_movies = spark.read.format('csv').options(header='true', inferSchema='true').load(movies_csv)
display(df_movies)

movie_key,movie_id,title,director,cast,country,release_year,rating,duration,listed_in,description
1,s1,Dick Johnson Is Dead,Kirsten Johnson,,United States,2020,PG-13,90 min,Documentaries,"As her father nears the end of his life, filmmaker Kirsten Johnson stages his death in inventive and comical ways to help them both face the inevitable."
2,s7,My Little Pony: A New Generation,"Robert Cullen, José Luis Ucha","Vanessa Hudgens, Kimiko Glenn, James Marsden, Sofia Carson, Liza Koshy, Ken Jeong, Elizabeth Perkins, Jane Krakowski, Michael McKean, Phil LaMarr",,2021,PG,91 min,Children & Family Movies,"Equestria's divided. But a bright-eyed hero believes Earth Ponies, Pegasi and Unicorns should be pals — and, hoof to heart, she’s determined to prove it."
3,s8,Sankofa,Haile Gerima,"Kofi Ghanaba, Oyafunmike Ogunlano, Alexandra Duah, Nick Medley, Mutabaruka, Afemo Omilami, Reggie Carter, Mzuri","United States, Ghana, Burkina Faso, United Kingdom, Germany, Ethiopia",1993,TV-MA,125 min,"Dramas, Independent Movies, International Movies","On a photo shoot in Ghana, an American model slips back in time, becomes enslaved on a plantation and bears witness to the agony of her ancestral past."
4,s10,The Starling,Theodore Melfi,"Melissa McCarthy, Chris O'Dowd, Kevin Kline, Timothy Olyphant, Daveed Diggs, Skyler Gisondo, Laura Harrier, Rosalind Chao, Kimberly Quinn, Loretta Devine, Ravi Kapoor",United States,2021,PG-13,104 min,"Comedies, Dramas",A woman adjusting to life after a loss contends with a feisty bird that's taken over her garden — and a husband who's struggling to find a way forward.
5,s13,Je Suis Karl,Christian Schwochow,"Luna Wedler, Jannis Niewöhner, Milan Peschel, Edin Hasanović, Anna Fialová, Marlon Boess, Victor Boccard, Fleur Geffrier, Aziz Dyab, Mélanie Fouché, Elizaveta Maximová","Germany, Czech Republic",2021,TV-MA,127 min,"Dramas, International Movies","After most of her family is murdered in a terrorist bombing, a young woman is unknowingly lured into joining the very group that killed them."
6,s14,Confessions of an Invisible Girl,Bruno Garotti,"Klara Castanho, Lucca Picon, Júlia Gomes, Marcus Bessa, Kiria Malheiros, Fernanda Concon, Gabriel Lima, Caio Cabral, Leonardo Cidade, Jade Cardozo",,2021,TV-PG,91 min,"Children & Family Movies, Comedies","When the clever but socially-awkward Tetê joins a new school, she'll do anything to fit in. But the queen bee among her classmates has other ideas."
7,s17,Europe's Most Dangerous Man: Otto Skorzeny in Spain,"Pedro de Echave García, Pablo Azorín Williams",,,2020,TV-MA,67 min,"Documentaries, International Movies","Declassified documents reveal the post-WWII life of Otto Skorzeny, a close Hitler ally who escaped to Spain and became an adviser to world presidents."
8,s19,Intrusion,Adam Salky,"Freida Pinto, Logan Marshall-Green, Robert John Burke, Megan Elisabeth Kelly, Sarah Minnich, Hayes Hargrove, Mark Sivertsen, Brandon Fierro, Antonio Valles, Clint Obenchain",,2021,TV-14,94 min,Thrillers,"After a deadly home invasion at a couple’s new dream house, the traumatized wife searches for answers — and learns the real danger is just beginning."
9,s23,Avvai Shanmughi,K.S. Ravikumar,"Kamal Hassan, Meena, Gemini Ganesan, Heera Rajgopal, Nassar, S.P. Balasubrahmanyam",,1996,TV-PG,161 min,"Comedies, International Movies","Newly divorced and denied visitation rights with his daughter, a doting father disguises himself as a gray-haired nanny in order to spend time with her."
10,s24,Go! Go! Cory Carson: Chrissy Takes the Wheel,"Alex Woo, Stanley Moore","Maisie Benson, Paul Killam, Kerry Gudjohnsen, AC Lim",,2021,TV-Y,61 min,Children & Family Movies,"From arcade games to sled days and hiccup cures, Cory Carson’s curious little sister Chrissy speeds off on her own for fun and adventure all over town!"


In [0]:
df_movies.printSchema()

root
 |-- movie_key: integer (nullable = true)
 |-- movie_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)



In [0]:
df_movies.write.format("delta").mode("overwrite").saveAsTable("netfluxter_dlh.dim_movies")

In [0]:
%sql
DESCRIBE EXTENDED netfluxter_dlh.dim_movies;

col_name,data_type,comment
movie_key,int,
movie_id,string,
title,string,
director,string,
cast,string,
country,string,
release_year,string,
rating,string,
duration,string,
listed_in,string,


In [0]:
%sql
SELECT * FROM netfluxter_dlh.dim_movies LIMIT 5;

movie_key,movie_id,title,director,cast,country,release_year,rating,duration,listed_in,description
1,s1,Dick Johnson Is Dead,Kirsten Johnson,,United States,2020,PG-13,90 min,Documentaries,"As her father nears the end of his life, filmmaker Kirsten Johnson stages his death in inventive and comical ways to help them both face the inevitable."
2,s7,My Little Pony: A New Generation,"Robert Cullen, José Luis Ucha","Vanessa Hudgens, Kimiko Glenn, James Marsden, Sofia Carson, Liza Koshy, Ken Jeong, Elizabeth Perkins, Jane Krakowski, Michael McKean, Phil LaMarr",,2021,PG,91 min,Children & Family Movies,"Equestria's divided. But a bright-eyed hero believes Earth Ponies, Pegasi and Unicorns should be pals — and, hoof to heart, she’s determined to prove it."
3,s8,Sankofa,Haile Gerima,"Kofi Ghanaba, Oyafunmike Ogunlano, Alexandra Duah, Nick Medley, Mutabaruka, Afemo Omilami, Reggie Carter, Mzuri","United States, Ghana, Burkina Faso, United Kingdom, Germany, Ethiopia",1993,TV-MA,125 min,"Dramas, Independent Movies, International Movies","On a photo shoot in Ghana, an American model slips back in time, becomes enslaved on a plantation and bears witness to the agony of her ancestral past."
4,s10,The Starling,Theodore Melfi,"Melissa McCarthy, Chris O'Dowd, Kevin Kline, Timothy Olyphant, Daveed Diggs, Skyler Gisondo, Laura Harrier, Rosalind Chao, Kimberly Quinn, Loretta Devine, Ravi Kapoor",United States,2021,PG-13,104 min,"Comedies, Dramas",A woman adjusting to life after a loss contends with a feisty bird that's taken over her garden — and a husband who's struggling to find a way forward.
5,s13,Je Suis Karl,Christian Schwochow,"Luna Wedler, Jannis Niewöhner, Milan Peschel, Edin Hasanović, Anna Fialová, Marlon Boess, Victor Boccard, Fleur Geffrier, Aziz Dyab, Mélanie Fouché, Elizaveta Maximová","Germany, Czech Republic",2021,TV-MA,127 min,"Dramas, International Movies","After most of her family is murdered in a terrorist bombing, a young woman is unknowingly lured into joining the very group that killed them."


__4.0 Verify Dimension Tables__

In [0]:
%sql
USE netfluxter_dlh;
SHOW TABLES

database,tableName,isTemporary
netfluxter_dlh,dim_customers,False
netfluxter_dlh,dim_date,False
netfluxter_dlh,dim_movies,False
netfluxter_dlh,dim_stores,False
,display_query_1,True
,display_query_2,True
,display_query_3,True
,rentals_bronze_tempview,True
,rentals_raw_tempview,True
,rentals_silver_tempview,True


###__Section 3: Integrate Reference Data with Real-Time Data__

__1.0 Use AutoLoader to Process Movie Rentals Fact Data__

1.1 Bronze Table: Process 'Raw' JSON Data

In [0]:
#* Reading from a stream *#
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rentals_stream_dir)
 .createOrReplaceTempView("rentals_raw_tempview"))

In [0]:
%sql
/* Add Metadata for traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

customer_id,fact_rental_key,movie_id,payment_amount,rental_date_key,rental_duration,rental_id,return_date_key,store_id,_rescued_data,receipt_time,source_file
35,601,s6281,6.99,20050731,6.0,9579,20050806.0,2,,2024-05-06T20:37:30.502Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
22,602,s4611,4.99,20050731,5.0,9580,20050805.0,1,,2024-05-06T20:37:30.502Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
14,603,s5888,2.99,20050731,5.0,9592,20050805.0,2,,2024-05-06T20:37:30.502Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
28,604,s8488,4.99,20050731,5.0,9593,20050805.0,1,,2024-05-06T20:37:30.502Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
3,605,s8449,2.99,20050731,3.0,9595,20050803.0,1,,2024-05-06T20:37:30.502Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
16,606,s8351,5.99,20050731,6.0,9610,20050806.0,2,,2024-05-06T20:37:30.502Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
7,607,s4850,7.99,20050731,6.0,9624,20050806.0,1,,2024-05-06T20:37:30.502Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
12,608,s7242,5.99,20050731,7.0,9627,20050808.0,1,,2024-05-06T20:37:30.502Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
8,609,s4082,2.99,20050731,2.0,9629,20050802.0,2,,2024-05-06T20:37:30.502Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
27,610,s7155,5.99,20050731,9.0,9636,20050809.0,2,,2024-05-06T20:37:30.502Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json


In [0]:
#* Writing data out to a bronze table *#
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f4e081052a0>

1.2 Silver Table: Include Reference Data

In [0]:
#* Read from bronze and put data into a silver temp view *#
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

customer_id,fact_rental_key,movie_id,payment_amount,rental_date_key,rental_duration,rental_id,return_date_key,store_id,_rescued_data,receipt_time,source_file
35,601,s6281,6.99,20050731,6.0,9579,20050806.0,2,,2024-05-06T20:37:44.839Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
22,602,s4611,4.99,20050731,5.0,9580,20050805.0,1,,2024-05-06T20:37:44.839Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
14,603,s5888,2.99,20050731,5.0,9592,20050805.0,2,,2024-05-06T20:37:44.839Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
28,604,s8488,4.99,20050731,5.0,9593,20050805.0,1,,2024-05-06T20:37:44.839Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
3,605,s8449,2.99,20050731,3.0,9595,20050803.0,1,,2024-05-06T20:37:44.839Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
16,606,s8351,5.99,20050731,6.0,9610,20050806.0,2,,2024-05-06T20:37:44.839Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
7,607,s4850,7.99,20050731,6.0,9624,20050806.0,1,,2024-05-06T20:37:44.839Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
12,608,s7242,5.99,20050731,7.0,9627,20050808.0,1,,2024-05-06T20:37:44.839Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
8,609,s4082,2.99,20050731,2.0,9629,20050802.0,2,,2024-05-06T20:37:44.839Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json
27,610,s7155,5.99,20050731,9.0,9636,20050809.0,2,,2024-05-06T20:37:44.839Z,dbfs:/FileStore/final_project_data/movie_rental/stream/rentals/Netfluxter_Fact_Rentals03.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
customer_id,bigint,
fact_rental_key,bigint,
movie_id,string,
payment_amount,double,
rental_date_key,bigint,
rental_duration,bigint,
rental_id,bigint,
return_date_key,bigint,
store_id,bigint,
_rescued_data,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT
    rst.customer_id,
    rst.fact_rental_key,
    rst.movie_id,
    rst.payment_amount,
    rst.rental_date_key,
    dd.full_date AS rental_full_date,
    dd.day_of_week AS rental_date_of_week,
    dd.month_name AS rental_month_name,
    rst.return_date_key,
    dd.full_date AS return_full_date,
    dd.day_of_week AS return_date_of_week,
    dd.month_name AS return_month_name,
    rst.rental_duration,
    dm.title AS movie_title,
    ds.store_id AS rental_store_id,
    dc.customer_key,
    dc.first_name AS customer_first_name,
    dc.last_name AS customer_last_name,
    dc.email AS customer_email,
    dc.active AS customer_active

  FROM
    rentals_silver_tempview AS rst

  JOIN 
    netfluxter_dlh.dim_date dd ON rst.rental_date_key = dd.date_key
  JOIN 
    netfluxter_dlh.dim_movies dm ON rst.movie_id = dm.movie_id
  JOIN 
    netfluxter_dlh.dim_stores ds ON rst.store_id = ds.store_id
  JOIN
    netfluxter_dlh.dim_customers dc ON rst.customer_id = dc.customer_id
)

In [0]:
#* Read from temp view and write data to a delta table *#
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f4dfbf83eb0>

In [0]:
%sql
SELECT * FROM fact_rentals_silver

customer_id,fact_rental_key,movie_id,payment_amount,rental_date_key,rental_full_date,rental_date_of_week,rental_month_name,return_date_key,return_full_date,return_date_of_week,return_month_name,rental_duration,movie_title,rental_store_id,customer_key,customer_first_name,customer_last_name,customer_email,customer_active
23,617,s817,3.99,20050731,2005-07-31,1,July,20050808.0,2005-07-31,1,July,7.0,The Interpreter,2,23,SARAH,LEWIS,SARAH.LEWIS@sakilacustomer.org,active
13,619,s744,7.99,20050731,2005-07-31,1,July,20050809.0,2005-07-31,1,July,8.0,Copenhagen,2,13,KAREN,JACKSON,KAREN.JACKSON@sakilacustomer.org,active
5,622,s959,3.99,20050731,2005-07-31,1,July,20050808.0,2005-07-31,1,July,8.0,Seven,1,5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,active
25,624,s440,2.99,20050731,2005-07-31,1,July,20050806.0,2005-07-31,1,July,5.0,Deep,1,25,DEBORAH,WALKER,DEBORAH.WALKER@sakilacustomer.org,active
24,625,s1010,0.99,20050731,2005-07-31,1,July,20050802.0,2005-07-31,1,July,2.0,Rudra: Secret of the Black Moon,2,24,KIMBERLY,LEE,KIMBERLY.LEE@sakilacustomer.org,active
13,630,s472,4.99,20050731,2005-07-31,1,July,20050802.0,2005-07-31,1,July,1.0,Day of Destiny,2,13,KAREN,JACKSON,KAREN.JACKSON@sakilacustomer.org,active
19,631,s240,9.99,20050731,2005-07-31,1,July,20050809.0,2005-07-31,1,July,8.0,Boomika (Telugu),1,19,RUTH,MARTINEZ,RUTH.MARTINEZ@sakilacustomer.org,active
23,634,s878,6.99,20050731,2005-07-31,1,July,20050807.0,2005-07-31,1,July,7.0,Cinema Bandi,2,23,SARAH,LEWIS,SARAH.LEWIS@sakilacustomer.org,active
2,635,s1090,2.99,20050731,2005-07-31,1,July,20050803.0,2005-07-31,1,July,2.0,Two Distant Strangers,1,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,active
29,658,s72,5.99,20050801,2005-08-01,2,August,20050807.0,2005-08-01,2,August,5.0,A StoryBots Space Adventure,2,29,ANGELA,HERNANDEZ,ANGELA.HERNANDEZ@sakilacustomer.org,active


In [0]:
%sql
DESCRIBE EXTENDED netfluxter_dlh.fact_rentals_silver

col_name,data_type,comment
customer_id,bigint,
fact_rental_key,bigint,
movie_id,string,
payment_amount,double,
rental_date_key,bigint,
rental_full_date,date,
rental_date_of_week,int,
rental_month_name,varchar(10),
return_date_key,bigint,
return_full_date,date,


1.3 Gold Table: Perform Aggregations

Create a Gold table. This table should include the number of movies rented out per customer each month, along with customer's ID, first and last name.

In [0]:
%sql
CREATE OR REPLACE TABLE netfluxter_dlh.fact_monthly_rentals_by_customer_gold AS
SELECT 
    dc.customer_id,
    dc.first_name,
    dc.last_name,
    dd.month_name AS rental_month,
    COUNT(rst.movie_id) AS movies_rented
FROM 
    netfluxter_dlh.fact_rentals_silver AS rst
JOIN 
    netfluxter_dlh.dim_date dd ON rst.rental_date_key = dd.date_key
JOIN 
    netfluxter_dlh.dim_customers dc ON rst.customer_id = dc.customer_id
GROUP BY 
    dc.customer_id,
    dc.first_name,
    dc.last_name,
    dd.month_name;

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM netfluxter_dlh.fact_monthly_rentals_by_customer_gold

customer_id,first_name,last_name,rental_month,movies_rented
3,LINDA,WILLIAMS,June,1
17,DONNA,THOMPSON,June,1
20,SHARON,ROBINSON,June,1
29,ANGELA,HERNANDEZ,June,2
1,MARY,SMITH,August,2
5,ELIZABETH,BROWN,June,1
15,HELEN,HARRIS,June,1
19,RUTH,MARTINEZ,July,1
31,BRENDA,WRIGHT,July,1
4,BARBARA,JONES,August,4


###__Section 4: Clean Up__

In [0]:
%fs rm -r /FileStore/final_project_data/