## Final Project: Using Sakila Database
**Amy Smith**

**DS-2002: Data Science Systems**

#### Necessary Imports

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### Connecting to Azure MySQL Server, MongoDB Atlas, and Setting Up Data Files

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "aps6cuq-sql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila"

connection_properties = {
  "user" : "asmith",
  "password" : "W0rking123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "dsproject.yysm8yq"
atlas_database_name = "sakila"
atlas_user_name = "aps6cuq"
atlas_password = "JpVjKZssyc3PYxhi"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/ds2002-final-project"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rental_payment_stream_dir = f"{stream_dir}/rental_payment"

rental_payment_output_bronze = f"{database_dir}/fact_rental_payment/bronze"
rental_payment_output_silver = f"{database_dir}/fact_rental_payment/silver"
rental_payment_output_gold   = f"{database_dir}/fact_rental_payment/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rental_payment", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[253]: True

#### Defining Global Functions

In [0]:
# Fetches a DataFrame from MongoDB Atlas database server Using PyMongo.

def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe


# Creates New Collections by Uploading JSON file(s) to the MongoDB Atlas server.

def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Populating Dimensions by Ingesting Reference Data 
#### Fetching reference data from an Azure MySQL database

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Final Project Database"
LOCATION "dbfs:/FileStore/ds2002-final-project/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

##### A new table that sources dim_date from a table in Azure MySQL

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://aps6cuq-mysql.mysql.database.azure.com:3306/sakila",
  dbtable "dim_date",
  user "asmith",
  password "W0rking123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-final-project/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20050101,2005-01-01,2005/01/01,01/01/2005,01/01/2005,7,Saturday,1,1,Weekend,53,January,1,N,1,2005,2005-01,2005Q1,7,3,2005,2005-07,2005Q3
20050102,2005-01-02,2005/01/02,01/02/2005,02/01/2005,1,Sunday,2,2,Weekend,53,January,1,N,1,2005,2005-01,2005Q1,7,3,2005,2005-07,2005Q3
20050103,2005-01-03,2005/01/03,01/03/2005,03/01/2005,2,Monday,3,3,Weekday,1,January,1,N,1,2005,2005-01,2005Q1,7,3,2005,2005-07,2005Q3
20050104,2005-01-04,2005/01/04,01/04/2005,04/01/2005,3,Tuesday,4,4,Weekday,1,January,1,N,1,2005,2005-01,2005Q1,7,3,2005,2005-07,2005Q3
20050105,2005-01-05,2005/01/05,01/05/2005,05/01/2005,4,Wednesday,5,5,Weekday,1,January,1,N,1,2005,2005-01,2005Q1,7,3,2005,2005-07,2005Q3


##### A new table that sources inventory table from Azure MySql

In [0]:
%sql
-- Create a Temporary View named "view_inventory" that extracts data from your MySQL Northwind database.
CREATE OR REPLACE TEMPORARY VIEW view_inventory
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://aps6cuq-mysql.mysql.database.azure.com:3306/sakila", dbtable "dim_inventory",  
  user "asmith",
  password "W0rking123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_inventory" using data from the view named "view_inventory"
CREATE OR REPLACE TABLE sakila_dlh.dim_inventory
COMMENT "Inventory Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-final-project/sakila_dlh/dim_inventory"
AS SELECT * FROM view_inventory

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory;

col_name,data_type,comment
inventory_id,int,
film_id,int,
store_id,int,
last_update,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_inventory,
Type,EXTERNAL,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5

inventory_id,film_id,store_id,last_update
1,1,1,2006-02-15 05:09:17
2,1,1,2006-02-15 05:09:17
3,1,1,2006-02-15 05:09:17
4,1,1,2006-02-15 05:09:17
5,1,2,2006-02-15 05:09:17


#### Fetching Reference Data from a MongoDB Atlas Database
##### Viewing the data files on Databricks

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/ds2002-final-project/source_data/batch/dim_customer.csv,dim_customer.csv,65638,1683930422000
dbfs:/FileStore/ds2002-final-project/source_data/batch/dim_customer.json,dim_customer.json,136808,1683930757000
dbfs:/FileStore/ds2002-final-project/source_data/batch/dim_payment.json,dim_payment.json,2622206,1683930423000
dbfs:/FileStore/ds2002-final-project/source_data/batch/dim_rental.csv,dim_rental.csv,1326452,1683930423000
dbfs:/FileStore/ds2002-final-project/source_data/batch/dim_staff.csv,dim_staff.csv,331,1683931595000
dbfs:/FileStore/ds2002-final-project/source_data/batch/dim_staff.json,dim_staff.json,466,1683930423000


##### Loading customer JSON table into a MongoDB collection

In [0]:
source_dir = '/dbfs/FileStore/ds2002-final-project/source_data/batch'
csv_files = {"dim_customer" : 'dim_customer.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[267]: <pymongo.results.InsertManyResult at 0x7f011f9dd440>

##### Fetching customer table from that same collection

In [0]:
%scala
import com.mongodb.spark._

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila").option("collection", "dim_customer").load()

display(df_customer.head(5))

_id,active,address_id,create_date,customer_id,email,first_name,last_name,last_update,store_id
List(645f0597137e8ac5f772a406),1,5,2006-02-14 22:04:36,1,MARY.SMITH@sakilacustomer.org,MARY,SMITH,2006-02-15 04:57:20,1
List(645f0597137e8ac5f772a407),1,6,2006-02-14 22:04:36,2,PATRICIA.JOHNSON@sakilacustomer.org,PATRICIA,JOHNSON,2006-02-15 04:57:20,1
List(645f0597137e8ac5f772a408),1,7,2006-02-14 22:04:36,3,LINDA.WILLIAMS@sakilacustomer.org,LINDA,WILLIAMS,2006-02-15 04:57:20,1
List(645f0597137e8ac5f772a409),1,8,2006-02-14 22:04:36,4,BARBARA.JONES@sakilacustomer.org,BARBARA,JONES,2006-02-15 04:57:20,2
List(645f0597137e8ac5f772a40a),1,9,2006-02-14 22:04:36,5,ELIZABETH.BROWN@sakilacustomer.org,ELIZABETH,BROWN,2006-02-15 04:57:20,1


In [0]:
%scala
df_customer.printSchema()

##### Creating a new dim_customer table in Databricks using the Spark DataFrame

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer

col_name,data_type,comment
_id,struct,
active,int,
address_id,int,
create_date,string,
customer_id,int,
email,string,
first_name,string,
last_name,string,
last_update,string,
store_id,int,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

_id,active,address_id,create_date,customer_id,email,first_name,last_name,last_update,store_id
List(645f0597137e8ac5f772a406),1,5,2006-02-14 22:04:36,1,MARY.SMITH@sakilacustomer.org,MARY,SMITH,2006-02-15 04:57:20,1
List(645f0597137e8ac5f772a407),1,6,2006-02-14 22:04:36,2,PATRICIA.JOHNSON@sakilacustomer.org,PATRICIA,JOHNSON,2006-02-15 04:57:20,1
List(645f0597137e8ac5f772a408),1,7,2006-02-14 22:04:36,3,LINDA.WILLIAMS@sakilacustomer.org,LINDA,WILLIAMS,2006-02-15 04:57:20,1
List(645f0597137e8ac5f772a409),1,8,2006-02-14 22:04:36,4,BARBARA.JONES@sakilacustomer.org,BARBARA,JONES,2006-02-15 04:57:20,2
List(645f0597137e8ac5f772a40a),1,9,2006-02-14 22:04:36,5,ELIZABETH.BROWN@sakilacustomer.org,ELIZABETH,BROWN,2006-02-15 04:57:20,1


#### Fetching Data from a File System
##### Reading from a CSV file using PySpark

In [0]:
staff_csv = f"{batch_dir}/dim_staff.csv"

df_staff = spark.read.format('csv').options(header='true', inferSchema='true').load(staff_csv)
display(df_staff)

staff_id,first_name,last_name,address_id,email,store_id,active,username,password,last_update
1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16.000+0000
2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15T03:57:16.000+0000


In [0]:
df_staff.printSchema()

root
 |-- staff_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- email: string (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- active: integer (nullable = true)
 |-- username: string (nullable = true)
 |-- password: string (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [0]:
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff;

col_name,data_type,comment
staff_id,int,
first_name,string,
last_name,string,
address_id,int,
email,string,
store_id,int,
active,int,
username,string,
password,string,
last_update,timestamp,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 2;

staff_id,first_name,last_name,address_id,email,store_id,active,username,password,last_update
1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16.000+0000
2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15T03:57:16.000+0000


##### Verifying the Four Dimension Tables!

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_inventory,False
sakila_dlh,dim_staff,False
,display_query_1,True
,display_query_2,True
,display_query_3,True
,display_query_4,True
,display_query_5,True
,display_query_6,True


### Integrating Reference Data with Real-Time Data
#### Processing streaming orders fact data using AutoLoader
##### Bronze Table: Processing Raw Rental_Payment JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", rental_payment_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rental_payment_stream_dir)
 .createOrReplaceTempView("rental_payment_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rental_payment_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rental_payment_raw_tempview
)

In [0]:
%sql
SELECT * FROM rental_payment_bronze_tempview
LIMIT 10

amount,customer_key,inventory_id,payment_customer_id,payment_date,payment_key,payment_last_update,payment_staff_id,rental_customer_id,rental_date,rental_id,rental_key,rental_last_update,rental_payment_key,rental_staff_id,return_date,staff_key,_rescued_data,receipt_time,source_file
0.99,506,4502,506,2005-08-19 07:26:10,13652,2006-02-15 22:20:32,1,506,2005-05-25 19:12:42,12994,114,2006-02-15 21:30:53,16865,1,2005-06-01 23:10:42,1,,2023-05-13T04:09:15.005+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
1.99,506,4502,506,2005-08-20 23:16:07,13655,2006-02-15 22:20:32,1,506,2005-05-25 19:12:42,14074,114,2006-02-15 21:30:53,16866,1,2005-06-01 23:10:42,1,,2023-05-13T04:09:15.005+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
2.99,506,4502,506,2005-08-21 08:34:26,13656,2006-02-15 22:20:32,1,506,2005-05-25 19:12:42,14337,114,2006-02-15 21:30:53,16867,1,2005-06-01 23:10:42,1,,2023-05-13T04:09:15.005+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
9.99,506,4502,506,2005-08-23 10:02:46,13660,2006-02-15 22:20:33,1,506,2005-05-25 19:12:42,15694,114,2006-02-15 21:30:53,16868,1,2005-06-01 23:10:42,1,,2023-05-13T04:09:15.005+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
1.99,455,749,455,2005-06-16 20:23:30,12284,2006-02-15 22:19:11,1,455,2005-05-25 19:13:25,1802,115,2006-02-15 21:30:53,16869,1,2005-05-29 20:17:25,1,,2023-05-13T04:09:15.005+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
2.99,455,749,455,2005-06-17 04:53:35,12285,2006-02-15 22:19:11,1,455,2005-05-25 19:13:25,1906,115,2006-02-15 21:30:53,16870,1,2005-05-29 20:17:25,1,,2023-05-13T04:09:15.005+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
8.99,455,749,455,2005-07-08 18:57:30,12288,2006-02-15 22:19:11,1,455,2005-05-25 19:13:25,4861,115,2006-02-15 21:30:53,16871,1,2005-05-29 20:17:25,1,,2023-05-13T04:09:15.005+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
2.99,455,749,455,2005-07-08 23:46:38,12289,2006-02-15 22:19:11,1,455,2005-05-25 19:13:25,4964,115,2006-02-15 21:30:53,16872,1,2005-05-29 20:17:25,1,,2023-05-13T04:09:15.005+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
6.99,455,749,455,2005-07-10 00:36:38,12290,2006-02-15 22:19:11,1,455,2005-05-25 19:13:25,5504,115,2006-02-15 21:30:53,16873,1,2005-05-29 20:17:25,1,,2023-05-13T04:09:15.005+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
4.99,455,749,455,2005-07-27 15:54:19,12292,2006-02-15 22:19:11,1,455,2005-05-25 19:13:25,7388,115,2006-02-15 21:30:53,16874,1,2005-05-29 20:17:25,1,,2023-05-13T04:09:15.005+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json


In [0]:
(spark.table("rental_payment_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_payment_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_payment_bronze"))

Out[280]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f011f9aa9a0>

##### Silver Table: Including Reference Data

In [0]:
(spark.readStream
  .table("fact_rental_payment_bronze")
  .createOrReplaceTempView("rental_payment_silver_tempview"))

In [0]:
%sql
SELECT * FROM rental_payment_silver_tempview
LIMIT 10

amount,customer_key,inventory_id,payment_customer_id,payment_date,payment_key,payment_last_update,payment_staff_id,rental_customer_id,rental_date,rental_id,rental_key,rental_last_update,rental_payment_key,rental_staff_id,return_date,staff_key,_rescued_data,receipt_time,source_file
0.99,506,4502,506,2005-08-19 07:26:10,13652,2006-02-15 22:20:32,1,506,2005-05-25 19:12:42,12994,114,2006-02-15 21:30:53,16865,1,2005-06-01 23:10:42,1,,2023-05-13T03:39:54.741+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
1.99,506,4502,506,2005-08-20 23:16:07,13655,2006-02-15 22:20:32,1,506,2005-05-25 19:12:42,14074,114,2006-02-15 21:30:53,16866,1,2005-06-01 23:10:42,1,,2023-05-13T03:39:54.741+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
2.99,506,4502,506,2005-08-21 08:34:26,13656,2006-02-15 22:20:32,1,506,2005-05-25 19:12:42,14337,114,2006-02-15 21:30:53,16867,1,2005-06-01 23:10:42,1,,2023-05-13T03:39:54.741+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
9.99,506,4502,506,2005-08-23 10:02:46,13660,2006-02-15 22:20:33,1,506,2005-05-25 19:12:42,15694,114,2006-02-15 21:30:53,16868,1,2005-06-01 23:10:42,1,,2023-05-13T03:39:54.741+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
1.99,455,749,455,2005-06-16 20:23:30,12284,2006-02-15 22:19:11,1,455,2005-05-25 19:13:25,1802,115,2006-02-15 21:30:53,16869,1,2005-05-29 20:17:25,1,,2023-05-13T03:39:54.741+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
2.99,455,749,455,2005-06-17 04:53:35,12285,2006-02-15 22:19:11,1,455,2005-05-25 19:13:25,1906,115,2006-02-15 21:30:53,16870,1,2005-05-29 20:17:25,1,,2023-05-13T03:39:54.741+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
8.99,455,749,455,2005-07-08 18:57:30,12288,2006-02-15 22:19:11,1,455,2005-05-25 19:13:25,4861,115,2006-02-15 21:30:53,16871,1,2005-05-29 20:17:25,1,,2023-05-13T03:39:54.741+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
2.99,455,749,455,2005-07-08 23:46:38,12289,2006-02-15 22:19:11,1,455,2005-05-25 19:13:25,4964,115,2006-02-15 21:30:53,16872,1,2005-05-29 20:17:25,1,,2023-05-13T03:39:54.741+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
6.99,455,749,455,2005-07-10 00:36:38,12290,2006-02-15 22:19:11,1,455,2005-05-25 19:13:25,5504,115,2006-02-15 21:30:53,16873,1,2005-05-29 20:17:25,1,,2023-05-13T03:39:54.741+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json
4.99,455,749,455,2005-07-27 15:54:19,12292,2006-02-15 22:19:11,1,455,2005-05-25 19:13:25,7388,115,2006-02-15 21:30:53,16874,1,2005-05-29 20:17:25,1,,2023-05-13T03:39:54.741+0000,dbfs:/FileStore/ds2002-final-project/source_data/stream/rental_payment/rental_payment3.json


In [0]:
%sql
DESCRIBE EXTENDED rental_payment_silver_tempview

col_name,data_type,comment
amount,double,
customer_key,bigint,
inventory_id,bigint,
payment_customer_id,bigint,
payment_date,string,
payment_key,bigint,
payment_last_update,string,
payment_staff_id,bigint,
rental_customer_id,bigint,
rental_date,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rental_payment_silver_tempview AS (
  SELECT
    `rp`.`payment_key`,
    `rp`.`customer_key`,
    `rp`.`staff_key`,
    `rp`.`rental_date`,
    `rp`.`inventory_id`,
    `rp`.`rental_customer_id`,
    `rp`.`return_date`,
    `rp`.`rental_staff_id`,
    `rp`.`payment_customer_id`,
    `rp`.`payment_staff_id`,
    `rp`.`rental_id`,
    `rp`.`amount`,
    `rp`.`payment_date`,
    `rp`.`payment_last_update`,
    `i`.`inventory_id` AS i_inventory_id,
    `i`.`film_id`,
    `i`.`store_id` AS i_store_id,
    `i`.`last_update` AS i_last_update,
    `s`.`staff_id`,
    `s`.`first_name`,
    `s`.`last_name`,
    `s`.`address_id` AS s_address_id,
    `s`.`email` AS s_email,
    `s`.`store_id` AS s_store_id,
    `s`.`active` AS s_active,
    `s`.`username`,
    `s`.`password`,
    `s`.`last_update` AS s_last_update,
    `c`.`customer_id`,
    `c`.`store_id`,
    `c`.`first_name` AS c_first_name,
    `c`.`last_name` AS c_last_name,
    `c`.`email` AS c_email,
    `c`.`address_id` AS c_address_id,
    `c`.`active` AS c_active,
    `c`.`create_date`,
    `c`.`last_update` AS c_last_update,
    `d`.`date_key`,
    `d`.`full_date`,
    `d`.`date_name`,
    `d`.`date_name_us`,
    `d`.`date_name_eu`,
    `d`.`day_of_week`,
    `d`.`day_name_of_week`,
    `d`.`day_of_month`,
    `d`.`day_of_year`,
    `d`.`weekday_weekend`,
    `d`.`week_of_year`,
    `d`.`month_name`,
    `d`.`month_of_year`,
    `d`.`is_last_day_of_month`,
    `d`.`calendar_quarter`,
    `d`.`calendar_year`,
    `d`.`calendar_year_month`,
    `d`.`calendar_year_qtr`,
    `d`.`fiscal_month_of_year`,
    `d`.`fiscal_quarter`,
    `d`.`fiscal_year`,
    `d`.`fiscal_year_month`,
    `d`.`fiscal_year_qtr`
  FROM rental_payment_silver_tempview as rp
  INNER JOIN sakila_dlh.dim_staff AS s
  ON s.staff_id = rp.staff_key
  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_id = rp.customer_key
  INNER JOIN sakila_dlh.dim_inventory AS i
  ON i.inventory_id = rp.inventory_id
  INNER JOIN sakila_dlh.dim_date AS d
  ON d.full_date = SUBSTRING(rp.payment_date, 1, 10) --Parsing the first 10 letters of payment_date to have the same format as full_date
)

In [0]:
(spark.table("fact_rental_payment_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rental_payment_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rental_payment_silver"))

Out[285]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f011f9bc760>

In [0]:
%sql
SELECT * FROM fact_rental_payment_silver
LIMIT 10

payment_key,customer_key,staff_key,rental_date,inventory_id,rental_customer_id,return_date,rental_staff_id,payment_customer_id,payment_staff_id,rental_id,amount,payment_date,payment_last_update,i_inventory_id,film_id,i_store_id,i_last_update,staff_id,first_name,last_name,s_address_id,s_email,s_store_id,s_active,username,password,s_last_update,customer_id,store_id,c_first_name,c_last_name,c_email,c_address_id,c_active,create_date,c_last_update,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
13652,506,1,2005-05-25 19:12:42,4502,506,2005-06-01 23:10:42,1,506,1,12994,0.99,2005-08-19 07:26:10,2006-02-15 22:20:32,4502,982,2,2006-02-15 05:09:17,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16.000+0000,506,2,LESLIE,SEWARD,LESLIE.SEWARD@sakilacustomer.org,511,1,2006-02-14 22:04:37,2006-02-15 04:57:20,20050819,2005-08-19,2005/08/19,08/19/2005,19/08/2005,6,Friday,19,231,Weekday,33,August,8,N,3,2005,2005-08,2005Q3,2,1,2006,2006-02,2006Q1
13655,506,1,2005-05-25 19:12:42,4502,506,2005-06-01 23:10:42,1,506,1,14074,1.99,2005-08-20 23:16:07,2006-02-15 22:20:32,4502,982,2,2006-02-15 05:09:17,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16.000+0000,506,2,LESLIE,SEWARD,LESLIE.SEWARD@sakilacustomer.org,511,1,2006-02-14 22:04:37,2006-02-15 04:57:20,20050820,2005-08-20,2005/08/20,08/20/2005,20/08/2005,7,Saturday,20,232,Weekend,33,August,8,N,3,2005,2005-08,2005Q3,2,1,2006,2006-02,2006Q1
13656,506,1,2005-05-25 19:12:42,4502,506,2005-06-01 23:10:42,1,506,1,14337,2.99,2005-08-21 08:34:26,2006-02-15 22:20:32,4502,982,2,2006-02-15 05:09:17,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16.000+0000,506,2,LESLIE,SEWARD,LESLIE.SEWARD@sakilacustomer.org,511,1,2006-02-14 22:04:37,2006-02-15 04:57:20,20050821,2005-08-21,2005/08/21,08/21/2005,21/08/2005,1,Sunday,21,233,Weekend,33,August,8,N,3,2005,2005-08,2005Q3,2,1,2006,2006-02,2006Q1
13660,506,1,2005-05-25 19:12:42,4502,506,2005-06-01 23:10:42,1,506,1,15694,9.99,2005-08-23 10:02:46,2006-02-15 22:20:33,4502,982,2,2006-02-15 05:09:17,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16.000+0000,506,2,LESLIE,SEWARD,LESLIE.SEWARD@sakilacustomer.org,511,1,2006-02-14 22:04:37,2006-02-15 04:57:20,20050823,2005-08-23,2005/08/23,08/23/2005,23/08/2005,3,Tuesday,23,235,Weekday,34,August,8,N,3,2005,2005-08,2005Q3,2,1,2006,2006-02,2006Q1
12284,455,1,2005-05-25 19:13:25,749,455,2005-05-29 20:17:25,1,455,1,1802,1.99,2005-06-16 20:23:30,2006-02-15 22:19:11,749,164,1,2006-02-15 05:09:17,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16.000+0000,455,2,JON,WILES,JON.WILES@sakilacustomer.org,460,1,2006-02-14 22:04:37,2006-02-15 04:57:20,20050616,2005-06-16,2005/06/16,06/16/2005,16/06/2005,5,Thursday,16,167,Weekday,24,June,6,N,2,2005,2005-06,2005Q2,12,4,2005,2005-12,2005Q4
12285,455,1,2005-05-25 19:13:25,749,455,2005-05-29 20:17:25,1,455,1,1906,2.99,2005-06-17 04:53:35,2006-02-15 22:19:11,749,164,1,2006-02-15 05:09:17,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16.000+0000,455,2,JON,WILES,JON.WILES@sakilacustomer.org,460,1,2006-02-14 22:04:37,2006-02-15 04:57:20,20050617,2005-06-17,2005/06/17,06/17/2005,17/06/2005,6,Friday,17,168,Weekday,24,June,6,N,2,2005,2005-06,2005Q2,12,4,2005,2005-12,2005Q4
12288,455,1,2005-05-25 19:13:25,749,455,2005-05-29 20:17:25,1,455,1,4861,8.99,2005-07-08 18:57:30,2006-02-15 22:19:11,749,164,1,2006-02-15 05:09:17,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16.000+0000,455,2,JON,WILES,JON.WILES@sakilacustomer.org,460,1,2006-02-14 22:04:37,2006-02-15 04:57:20,20050708,2005-07-08,2005/07/08,07/08/2005,08/07/2005,6,Friday,8,189,Weekday,27,July,7,N,3,2005,2005-07,2005Q3,1,1,2006,2006-01,2006Q1
12289,455,1,2005-05-25 19:13:25,749,455,2005-05-29 20:17:25,1,455,1,4964,2.99,2005-07-08 23:46:38,2006-02-15 22:19:11,749,164,1,2006-02-15 05:09:17,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16.000+0000,455,2,JON,WILES,JON.WILES@sakilacustomer.org,460,1,2006-02-14 22:04:37,2006-02-15 04:57:20,20050708,2005-07-08,2005/07/08,07/08/2005,08/07/2005,6,Friday,8,189,Weekday,27,July,7,N,3,2005,2005-07,2005Q3,1,1,2006,2006-01,2006Q1
12290,455,1,2005-05-25 19:13:25,749,455,2005-05-29 20:17:25,1,455,1,5504,6.99,2005-07-10 00:36:38,2006-02-15 22:19:11,749,164,1,2006-02-15 05:09:17,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16.000+0000,455,2,JON,WILES,JON.WILES@sakilacustomer.org,460,1,2006-02-14 22:04:37,2006-02-15 04:57:20,20050710,2005-07-10,2005/07/10,07/10/2005,10/07/2005,1,Sunday,10,191,Weekend,27,July,7,N,3,2005,2005-07,2005Q3,1,1,2006,2006-01,2006Q1
12292,455,1,2005-05-25 19:13:25,749,455,2005-05-29 20:17:25,1,455,1,7388,4.99,2005-07-27 15:54:19,2006-02-15 22:19:11,749,164,1,2006-02-15 05:09:17,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15T03:57:16.000+0000,455,2,JON,WILES,JON.WILES@sakilacustomer.org,460,1,2006-02-14 22:04:37,2006-02-15 04:57:20,20050727,2005-07-27,2005/07/27,07/27/2005,27/07/2005,4,Wednesday,27,208,Weekday,30,July,7,N,3,2005,2005-07,2005Q3,1,1,2006,2006-01,2006Q1


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rental_payment_silver

col_name,data_type,comment
payment_key,bigint,
customer_key,bigint,
staff_key,bigint,
rental_date,string,
inventory_id,bigint,
rental_customer_id,bigint,
return_date,string,
rental_staff_id,bigint,
payment_customer_id,bigint,
payment_staff_id,bigint,


##### Gold Table: Performing Aggregations
###### First aggregation groups total amount paid by staff last name
###### Second aggregation groups customer name, film ID, and film count by customer ID

In [0]:
%sql

SELECT last_name,
SUM (amount) AS total_amount
FROM sakila_dlh.fact_rental_payment_silver
GROUP BY last_name
ORDER BY total_amount DESC

last_name,total_amount
Stephens,468591.3499993294
Hillyer,467143.1699993312


In [0]:
%sql

SELECT distinct customer_id AS CustomerID
  , c_last_name AS CustomerName
  , film_id AS FilmID
  , ic.FilmCount
FROM sakila_dlh.fact_rental_payment_silver AS rp
INNER JOIN (
  SELECT customer_id AS CustomerID
  , COUNT(film_id) AS FilmCount
  FROM sakila_dlh.fact_rental_payment_silver
  GROUP BY customer_id
) AS ic
ON ic.CustomerID = rp.customer_id
ORDER BY FilmCount DESC
LIMIT 10

CustomerID,CustomerName,FilmID,FilmCount
148,HUNT,709,1056
148,HUNT,166,1056
148,HUNT,285,1056
148,HUNT,374,1056
148,HUNT,708,1056
148,HUNT,610,1056
148,HUNT,552,1056
148,HUNT,39,1056
148,HUNT,957,1056
148,HUNT,645,1056


#### 9.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/ds2002-final-project/