# DS 2002 Final Project
#### Ashley Manzanares (fnv2vx)
This project utilizes the [Sakila](https://dev.mysql.com/doc/sakila/en/sakila-preface.html) Data Warehouse that was created in [Project 1](https://github.com/amanzanares410/DS2002-Project1/tree/main). This project creates a Data Lakehouse for a rental business. It develops a rentals fact table which records each rental, including unique identifiers for each rentals, associated inventory items, customers, and staff members involved, as well as rental and return dates.

To recreate this project in Azure Databricks, the Spark Configurations must include your MongoDB connection string and the following libraries must be installed:
- `pymongo[srv]==3.11`
- `org.mongodb.spark:mongo-spark-connector_2.12:3.0.1`

The data located in the [Github repo](https://github.com/amanzanares410/DS2002-Final) must also be uploaded to the DBFS beforehand.

In [0]:
# import statements
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

### Project Set Up
#### Instantiating Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "amanzanares-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "amanzanares",
  "password" : "Passw0rd123!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "ds2002.x5wij4u"
atlas_database_name = "sakila_dw"
atlas_user_name = "fnv2vx"
atlas_password = "Passw0rd123!"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/final_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/sakila"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rentals_stream_dir = f"{stream_dir}/rentals"

rentals_output_bronze = f"{database_dir}/fact_rentals/bronze"
rentals_output_silver = f"{database_dir}/fact_rentals/silver"
rentals_output_gold   = f"{database_dir}/fact_rentals/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rentals", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

False

#### Defining Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

#### Creating the Databricks Metadata Database

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Lab 06 Database"
LOCATION "dbfs:/FileStore/final_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

### Populating Dimension Tables From Various Sources
#### Fetching Reference Data From an Azure MySQL Database
Populating the Date Dimension Table

In [0]:
%sql
-- Dim Date Table
-- Create a Temporary View named "view_date"
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://amanzanares-mysql.mysql.database.azure.com:3306/sakila_dw", --Replace with your Server Name
  dbtable "dim_date",
  user "amanzanares",    --Replace with your User Name
  password "Passw0rd123!"  --Replace with you password
)

In [0]:
%sql
-- Create a new table named "sakila_dlh.dim_date" using data from the view named "view_date"
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/final_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


Populating the Inventory Dimension Table

In [0]:
%sql
-- Inventory Dimension Table
-- Create a Temporary View named "view_inventory" 
CREATE OR REPLACE TEMPORARY VIEW view_inventory
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://amanzanares-mysql.mysql.database.azure.com:3306/sakila_dw", --Replace with your Server Name
  dbtable "dim_inventory",
  user "amanzanares",    --Replace with your User Name
  password "Passw0rd123!"  --Replace with you password
)

In [0]:
%sql
-- Create a new table named "sakila_dlh.dim_inventory" using data from the view named "view_inventory"
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_inventory
COMMENT "Inventory Dimension Table"
LOCATION "dbfs:/FileStore/final_data/sakila_dlh/dim_inventory"
AS SELECT * FROM view_inventory

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory;

col_name,data_type,comment
inventory_key,bigint,
inventory_id,bigint,
film_id,bigint,
store_id,bigint,
,,
# Delta Statistics Columns,,
Column Names,"inventory_key, inventory_id, film_id, store_id",
Column Selection Method,first-32,
,,
# Detailed Table Information,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5

inventory_key,inventory_id,film_id,store_id
1,1,1,1
2,2,1,1
3,3,1,1
4,4,1,1
5,5,1,2


#### Fetching Reference Data from a MongoDB Atlas Database
Creating the new MongoDB database and loading in JSON data

In [0]:
source_dir = '/dbfs/FileStore/final_data/sakila/batch'
json_files = {"Customers" : 'Sakila_DimCustomer.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f9696411c80>

Populating Customer Dimension Table

In [0]:
%scala
// Setting up connection to MongoDB
import com.mongodb.spark._

val userName = "fnv2vx"
val pwd = "Passw0rd123!"
val clusterName = "ds2002.x5wij4u"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"


In [0]:
%scala
// Reading data from MondoDB into "df_customer"

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila_dw")
.option("collection", "Customers").load()
.select("customer_key","store_id","first_name","last_name","email","address_id","active")

display(df_customer)

customer_key,store_id,first_name,last_name,email,address_id,active
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1
6,2,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,10,1
7,1,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,11,1
8,2,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,12,1
9,2,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,13,1
10,1,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,14,1


In [0]:
%scala
df_customer.printSchema()

In [0]:
%scala
// Writing "df_customer" into datalake at "sakila_dlh.dim_customer"
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer

col_name,data_type,comment
customer_key,int,
store_id,int,
first_name,string,
last_name,string,
email,string,
address_id,int,
active,int,
,,
# Delta Statistics Columns,,
Column Names,"first_name, email, store_id, address_id, last_name, customer_key, active",


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_key,store_id,first_name,last_name,email,address_id,active
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1


#### Fetching Data from a File System
Populating Staff Dimension Table

In [0]:
# Staff Dimension Table
staff_csv = f"{batch_dir}/Sakila_DimStaff.csv"

# Reading from the DBFS
df_staff = spark.read.format('csv').options(header='true', inferSchema='true').load(staff_csv)
display(df_staff)

my_row_id,staff_key,staff_id,first_name,last_name,address_id,email,store_id,active,username,password
1,1,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964
2,2,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,


In [0]:
df_staff.printSchema()

root
 |-- my_row_id: integer (nullable = true)
 |-- staff_key: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- email: string (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- active: integer (nullable = true)
 |-- username: string (nullable = true)
 |-- password: string (nullable = true)



In [0]:
# Writing "df_staff" into datalake at "sakila_dlh.dim_staff"
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff;

col_name,data_type,comment
my_row_id,int,
staff_key,int,
staff_id,int,
first_name,string,
last_name,string,
address_id,int,
email,string,
store_id,int,
active,int,
username,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 5;

my_row_id,staff_key,staff_id,first_name,last_name,address_id,email,store_id,active,username,password
1,1,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964
2,2,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,


### Integrating Reference Data with Real-Time Data
#### Bronze Table: Processing 'Raw' JSON Data

In [0]:
# Use spark.readStream and the AutoLoader to read in the JSON files in the "rentals_stream_dir"
# directory and then create a TempView named "rentals_raw_tempview".
# Be sure to set the "cloudFiles.schemaLocation" Option using the "rentals_output_bronze" directory
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rentals_stream_dir)
 .createOrReplaceTempView("rentals_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

customer_key,fact_rental_key,inventory_key,rental_date_key,rental_id,return_date_key,staff_key,_rescued_data,receipt_time,source_file
419,10574,2328,20050731,10150,20050805,2,,2024-05-08T15:32:42.166Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals03.json
339,10575,2328,20050706,3536,20050712,2,,2024-05-08T15:32:42.166Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals03.json
254,10576,635,20050709,5080,20050711,2,,2024-05-08T15:32:42.166Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals03.json
190,10577,635,20050529,693,20050603,2,,2024-05-08T15:32:42.166Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals03.json
362,10578,635,20050822,14950,20050827,2,,2024-05-08T15:32:42.166Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals03.json
178,10579,635,20050616,1568,20050619,2,,2024-05-08T15:32:42.166Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals03.json
419,10580,2253,20050802,11025,20050808,2,,2024-05-08T15:32:42.166Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals03.json
475,10581,2253,20050527,417,20050529,2,,2024-05-08T15:32:42.166Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals03.json
475,10582,3784,20050708,4617,20050717,2,,2024-05-08T15:32:42.166Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals03.json
419,10583,989,20050802,11425,20050803,2,,2024-05-08T15:32:42.166Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals03.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f96a4f0a9d0>

#### Silver Table: Including Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

customer_key,fact_rental_key,inventory_key,rental_date_key,rental_id,return_date_key,staff_key,_rescued_data,receipt_time,source_file
546,5287,2760,20050802,11352,20050810,1,,2024-05-08T15:34:34.813Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals02.json
2,5288,2760,20050710,5755,20050719,1,,2024-05-08T15:34:34.813Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals02.json
341,5289,2760,20050615,1318,20050620,1,,2024-05-08T15:34:34.813Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals02.json
375,5290,1416,20050802,10961,20050809,1,,2024-05-08T15:34:34.813Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals02.json
64,5291,2098,20050802,10889,20050807,1,,2024-05-08T15:34:34.813Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals02.json
356,5292,2098,20050708,4599,20050711,1,,2024-05-08T15:34:34.813Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals02.json
182,5293,2139,20050819,13285,20050820,1,,2024-05-08T15:34:34.813Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals02.json
182,5294,3797,20050822,15043,20050828,1,,2024-05-08T15:34:34.813Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals02.json
220,5295,3797,20050711,6412,20050716,1,,2024-05-08T15:34:34.813Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals02.json
559,5296,3797,20050730,9134,20050801,1,,2024-05-08T15:34:34.813Z,dbfs:/FileStore/final_data/sakila/stream/rentals/Sakila_Fact_Rentals02.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
customer_key,bigint,
fact_rental_key,bigint,
inventory_key,bigint,
rental_date_key,bigint,
rental_id,bigint,
return_date_key,bigint,
staff_key,bigint,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [0]:
%sql
-- Creating a new Temporary View named "rentals_silver_tempview" by selecting data from "rentals_silver_tempview" and joining it to the Customer, Staff, Inventory and Date dimension tables.

CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT r.fact_rental_key,
    r.rental_id,
    r.rental_date_key,
    rentd.day_name_of_week AS rental_day_name_of_week,
    rentd.day_of_month AS rental_day_of_month,
    rentd.weekday_weekend AS rental_weekday_weekend,
    rentd.month_name AS rental_month_name,
    rentd.calendar_quarter AS rental_quarter,
    rentd.calendar_year AS rental_year,
    r.return_date_key,
    returnd.day_name_of_week AS return_day_name_of_week,
    returnd.day_of_month AS return_day_of_month,
    returnd.weekday_weekend AS return_weekday_weekend,
    returnd.month_name AS return_month_name,
    returnd.calendar_quarter AS return_quarter,
    returnd.calendar_year AS return_year,
    r.customer_key,
    c.first_name AS customer_first_name,
    c.last_name AS customer_last_name,
    c.email AS customer_email,
    r.staff_key,
    s.first_name AS staff_first_name,
    s.last_name AS staff_last_name,
    s.email AS staff_email,
    s.username AS staff_username,
    s.password AS staff_password,
    r.inventory_key
  FROM rentals_silver_tempview AS r
  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_key = r.customer_key
  INNER JOIN sakila_dlh.dim_inventory AS i
  ON i.inventory_key = r.inventory_key
  INNER JOIN sakila_dlh.dim_staff AS s
  ON s.staff_key = r.staff_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS rentd
  ON rentd.date_key = r.rental_date_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS returnd
  ON returnd.date_key = r.return_date_key
)

In [0]:
%sql
SELECT * FROM fact_rentals_silver_tempview LIMIT 5

fact_rental_key,rental_id,rental_date_key,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_quarter,rental_year,return_date_key,return_day_name_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_quarter,return_year,customer_key,customer_first_name,customer_last_name,customer_email,staff_key,staff_first_name,staff_last_name,staff_email,staff_username,staff_password,inventory_key
5287,11352,20050802,Tuesday,2,Weekday,August,3,2005,20050810,Wednesday,10,Weekday,August,3,2005,546,KELLY,KNOTT,KELLY.KNOTT@sakilacustomer.org,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964,2760
5288,5755,20050710,Sunday,10,Weekend,July,3,2005,20050719,Tuesday,19,Weekday,July,3,2005,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964,2760
5289,1318,20050615,Wednesday,15,Weekday,June,2,2005,20050620,Monday,20,Weekday,June,2,2005,341,PETER,MENARD,PETER.MENARD@sakilacustomer.org,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964,2760
5290,10961,20050802,Tuesday,2,Weekday,August,3,2005,20050809,Tuesday,9,Weekday,August,3,2005,375,AARON,SELBY,AARON.SELBY@sakilacustomer.org,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964,1416
5291,10889,20050802,Tuesday,2,Weekday,August,3,2005,20050807,Sunday,7,Weekend,August,3,2005,64,JUDITH,COX,JUDITH.COX@sakilacustomer.org,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964,2098


In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f96962f68d0>

In [0]:
%sql
SELECT * FROM fact_rentals_silver

fact_rental_key,rental_id,rental_date_key,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_quarter,rental_year,return_date_key,return_day_name_of_week,return_day_of_month,return_weekday_weekend,return_month_name,return_quarter,return_year,customer_key,customer_first_name,customer_last_name,customer_email,staff_key,staff_first_name,staff_last_name,staff_email,staff_username,staff_password,inventory_key
5287,11352,20050802,Tuesday,2,Weekday,August,3,2005,20050810,Wednesday,10,Weekday,August,3,2005,546,KELLY,KNOTT,KELLY.KNOTT@sakilacustomer.org,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964,2760
5288,5755,20050710,Sunday,10,Weekend,July,3,2005,20050719,Tuesday,19,Weekday,July,3,2005,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964,2760
5289,1318,20050615,Wednesday,15,Weekday,June,2,2005,20050620,Monday,20,Weekday,June,2,2005,341,PETER,MENARD,PETER.MENARD@sakilacustomer.org,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964,2760
5290,10961,20050802,Tuesday,2,Weekday,August,3,2005,20050809,Tuesday,9,Weekday,August,3,2005,375,AARON,SELBY,AARON.SELBY@sakilacustomer.org,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964,1416
5291,10889,20050802,Tuesday,2,Weekday,August,3,2005,20050807,Sunday,7,Weekend,August,3,2005,64,JUDITH,COX,JUDITH.COX@sakilacustomer.org,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964,2098
5292,4599,20050708,Friday,8,Weekday,July,3,2005,20050711,Monday,11,Weekday,July,3,2005,356,GERALD,FULTZ,GERALD.FULTZ@sakilacustomer.org,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964,2098
5293,13285,20050819,Friday,19,Weekday,August,3,2005,20050820,Saturday,20,Weekend,August,3,2005,182,RENEE,LANE,RENEE.LANE@sakilacustomer.org,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964,2139
5294,15043,20050822,Monday,22,Weekday,August,3,2005,20050828,Sunday,28,Weekend,August,3,2005,182,RENEE,LANE,RENEE.LANE@sakilacustomer.org,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964,3797
5295,6412,20050711,Monday,11,Weekday,July,3,2005,20050716,Saturday,16,Weekend,July,3,2005,220,CHARLENE,ALVAREZ,CHARLENE.ALVAREZ@sakilacustomer.org,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964,3797
5296,9134,20050730,Saturday,30,Weekend,July,3,2005,20050801,Monday,1,Weekday,August,3,2005,559,EVERETT,BANDA,EVERETT.BANDA@sakilacustomer.org,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike,8cb2237d0679ca88db6464eac60da96345513964,3797


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rentals_silver

col_name,data_type,comment
fact_rental_key,bigint,
rental_id,bigint,
rental_date_key,bigint,
rental_day_name_of_week,varchar(10),
rental_day_of_month,tinyint,
rental_weekday_weekend,varchar(10),
rental_month_name,varchar(10),
rental_quarter,tinyint,
rental_year,int,
return_date_key,bigint,


#### Gold Table: Performing Aggregations
This query counts the number of rentals (Total Rentals) for each customer, grouping the results by the customer's last name and orders them in descending order of the count, so you can see which customers rented the most.

In [0]:
%sql
-- Author a query that returns "Total Rentals" in descending order
CREATE OR REPLACE TABLE sakila_dlh.fact_rentals_gold AS (
    SELECT customer_last_name AS Customer
        , COUNT(rental_id) AS TotalRentals
    FROM sakila_dlh.fact_rentals_silver
    GROUP BY  Customer
    ORDER BY TotalRentals DESC
);

SELECT * FROM sakila_dlh.fact_rentals_gold;

Customer,TotalRentals
HUNT,46
SEAL,45
SHAW,42
DEAN,41
BULL,40
PETERS,40
CARY,39
KENNEDY,39
COLLAZO,38
SNYDER,38


### Cleaning up the File System

In [0]:
%fs rm -r /FileStore/final_data/