## Eric Nguyen (wvu9cs)
## Final Project: Course Capstone
The goal of the second dat project, building upon the first project, is to further demonstrate my understanding of and competence in implementing the data science systems covered throughout this course (e.g., Relational & NoSQL databases, ETL process piplines, data transformations, SQL and Python scripts, API's and cloud services).

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- A Cloud File System like Azure Data Lakes
  - Various Datafile Formats (e.g., JSON, CSV)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
# Import any necessary packages/libraries
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "wvu9cs-mysql2.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw2"

connection_properties = {
  "user" : "wvu9cs",
  "password" : "Passw0rd123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "ds2002.wysflex"
atlas_database_name = "sakila_dw2"
atlas_user_name = "wvu9cs"
atlas_password = "Passw0rd123"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/ds2002-final"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rentals_stream_dir = f"{stream_dir}/orders"
purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
inventory_trans_stream_dir = f"{stream_dir}/inventory_transactions"

rentals_output_bronze = f"{database_dir}/fact_orders/bronze"
rentals_output_silver = f"{database_dir}/fact_orders/silver"
rentals_output_gold   = f"{database_dir}/fact_orders/gold"

purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"

inventory_trans_output_bronze = f"{database_dir}/fact_inventory_transactions/bronze"
inventory_trans_output_silver = f"{database_dir}/fact_inventory_transactions/silver"
inventory_trans_output_gold   = f"{database_dir}/fact_inventory_transactions/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[2]: True

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]

    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Final Database"
LOCATION "dbfs:/FileStore/ds2002-final/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.
Date Dimension Table #1: Relational MySQL Database

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://wna8fw-mysql.mysql.database.azure.com:3306/northwind_dw2",
  dbtable "dim_date",
  user "jtupitza",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-final/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/ds2002-final/source_data/batch/dim_customers.json,dim_customers.json,87794,1683666575000
dbfs:/FileStore/ds2002-final/source_data/batch/dim_rentals.json,dim_rentals.json,160475,1683666575000
dbfs:/FileStore/ds2002-final/source_data/batch/dim_staff.csv,dim_staff.csv,136,1683667517000
dbfs:/FileStore/ds2002-final/source_data/batch/factrentals1.json,factrentals1.json,675999,1683211936000
dbfs:/FileStore/ds2002-final/source_data/batch/factrentals2.json,factrentals2.json,679616,1683211936000
dbfs:/FileStore/ds2002-final/source_data/batch/factrentals3.json,factrentals3.json,687271,1683211936000
dbfs:/FileStore/ds2002-final/source_data/batch/sakila_customer.json,sakila_customer.json,160173,1683267234000
dbfs:/FileStore/ds2002-final/source_data/batch/sakila_rental.json,sakila_rental.json,215475,1683267234000
dbfs:/FileStore/ds2002-final/source_data/batch/sakila_store.csv,sakila_store.csv,309,1683267234000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/ds2002-final/source_data/batch'
json_files = {"rentals" : 'dim_rentals.json', "customers" : 'dim_customers.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[11]: <pymongo.results.InsertManyResult at 0x7f758c267080>

##### 2.3.1. Fetch Rental Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val uri = "mongodb+srv://wvu9cs:Passw0rd123@ds2002.wysflex.mongodb.net/sakila_dw2"

val df_rentals = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri",  uri)
.option("database", "sakila_dw2").option("collection", "rentals").load()
.select("rental_key","rental_date","inventory_key", "customer_key","return_date","staff_key")

display(df_rentals)

rental_key,rental_date,inventory_key,customer_key,return_date,staff_key
1,2005-05-24,367,130,2005-05-26,1
2,2005-05-24,1525,459,2005-05-28,1
3,2005-05-24,1711,408,2005-06-01,1
4,2005-05-24,2452,333,2005-06-03,2
5,2005-05-24,2079,222,2005-06-02,1
6,2005-05-24,2792,549,2005-05-27,1
7,2005-05-24,3995,269,2005-05-29,2
8,2005-05-24,2346,239,2005-05-27,2
9,2005-05-25,2580,126,2005-05-28,1
10,2005-05-25,1824,399,2005-05-31,2


In [0]:
%scala
df_rentals.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Rentals Dimension Table in the Databricks Metadata Database (sakila_dlh)
Rentals Dimensional Table #2: NoSQL MongoDB Database

In [0]:
%scala
df_rentals.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_rentals")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_rentals

col_name,data_type,comment
rental_key,int,
rental_date,string,
inventory_key,int,
customer_key,int,
return_date,string,
staff_key,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_rentals LIMIT 5

rental_key,rental_date,inventory_key,customer_key,return_date,staff_key
1,2005-05-24,367,130,2005-05-26,1
2,2005-05-24,1525,459,2005-05-28,1
3,2005-05-24,1711,408,2005-06-01,1
4,2005-05-24,2452,333,2005-06-03,2
5,2005-05-24,2079,222,2005-06-02,1


##### 2.3.3. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val uri = "mongodb+srv://wvu9cs:Passw0rd123@ds2002.wysflex.mongodb.net/sakila_dw2"

val df_customers = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri",  uri)
.option("database", "sakila_dw2").option("collection", "customers").load()
.select("customer_key","first_name", "last_name","email","active")

display(df_customers)

customer_key,first_name,last_name,email,active
1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1
2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1
3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,1
4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,1
5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,1
6,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,1
7,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,1
8,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,1
9,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,1
10,DOROTHY,TAYLOR,DOROTHY.TAYLOR@sakilacustomer.org,1


In [0]:
%scala
df_customers.printSchema()

##### 2.3.4. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (sakila_dlh)
Customers Dimensional Table #3: NoSQL MongoDB Database

In [0]:
%scala
df_customers.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customers")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customers

col_name,data_type,comment
customer_key,int,
first_name,string,
last_name,string,
email,string,
active,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_customers,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customers LIMIT 5

customer_key,first_name,last_name,email,active
1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1
2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1
3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,1
4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,1
5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,1


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File
Staff Dimensional Table #4: Cloud File System (CSV) Database

In [0]:
staff_csv = f"{batch_dir}/dim_staff.csv"

df_staff = spark.read.format('csv').options(header='true', inferSchema='true').load(staff_csv)
display(df_staff)

staff_key,first_name,last_name,email,active
1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1
2,Jon,Stephens,Jon.Stephens@sakilastaff.com,1


In [0]:
df_staff.printSchema()

root
 |-- staff_key: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- active: integer (nullable = true)



In [0]:
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff;

col_name,data_type,comment
staff_key,int,
first_name,string,
last_name,string,
email,string,
active,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_staff,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 5;

staff_key,first_name,last_name,email,active
1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1
2,Jon,Stephens,Jon.Stephens@sakilastaff.com,1


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customers,False
sakila_dlh,dim_date,False
sakila_dlh,dim_rentals,False
sakila_dlh,dim_staff,False
,view_date,True


#### 7.0. Use AutoLoader to Process Streaming (Hot Path) Rentals Fact Data 
##### 7.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "transaction_key BIGINT")
 .option("cloudFiles.schemaHints", "rental_key BIGINT")
 .option("cloudFiles.schemaHints", "customer_key BIGINT")
 .option("cloudFiles.schemaHints", "rental_date_key BIGINT") 
 .option("cloudFiles.schemaHints", "staff_key BIGINT")
 .option("cloudFiles.schemaHints", "return_date BIGINT")
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rentals_stream_dir)
 .createOrReplaceTempView("rentals"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

customer_key,rental_date_key,rental_key,return_date,staff_key,transaction_key,_rescued_data,receipt_time,source_file
538,,10701,1123545600000.0,1,10697,,2023-05-10T00:50:02.666+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals3.json
560,,10702,1123545600000.0,2,10698,,2023-05-10T00:50:02.666+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals3.json
181,,10703,1123372800000.0,2,10699,,2023-05-10T00:50:02.666+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals3.json
594,,10704,1123459200000.0,1,10700,,2023-05-10T00:50:02.666+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals3.json
381,,10705,1123113600000.0,1,10701,,2023-05-10T00:50:02.666+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals3.json
147,,10706,1123632000000.0,1,10702,,2023-05-10T00:50:02.666+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals3.json
558,,10707,1123286400000.0,1,10703,,2023-05-10T00:50:02.666+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals3.json
567,,10708,1123545600000.0,1,10704,,2023-05-10T00:50:02.666+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals3.json
418,,10709,1123286400000.0,2,10705,,2023-05-10T00:50:02.666+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals3.json
177,,10710,1123027200000.0,1,10706,,2023-05-10T00:50:02.666+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals3.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

Out[38]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f758592bb80>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

customer_key,rental_date_key,rental_key,return_date,staff_key,transaction_key,_rescued_data,receipt_time,source_file
130,,1,1117065600000,1,1,,2023-05-10T00:46:26.202+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals1.json
459,,2,1117238400000,1,2,,2023-05-10T00:46:26.202+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals1.json
408,,3,1117584000000,1,3,,2023-05-10T00:46:26.202+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals1.json
333,,4,1117756800000,2,4,,2023-05-10T00:46:26.202+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals1.json
222,,5,1117670400000,1,5,,2023-05-10T00:46:26.202+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals1.json
549,,6,1117152000000,1,6,,2023-05-10T00:46:26.202+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals1.json
269,,7,1117324800000,2,7,,2023-05-10T00:46:26.202+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals1.json
239,,8,1117152000000,2,8,,2023-05-10T00:46:26.202+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals1.json
126,,9,1117238400000,1,9,,2023-05-10T00:46:26.202+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals1.json
399,,10,1117497600000,2,10,,2023-05-10T00:46:26.202+0000,dbfs:/FileStore/ds2002-final/source_data/stream/orders/factrentals1.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
customer_key,bigint,
rental_date_key,string,
rental_key,bigint,
return_date,bigint,
staff_key,bigint,
transaction_key,bigint,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT r.transaction_key,

      r.rental_key AS rental_key,
      re.rental_date AS rental_date,
      re.return_date AS return_date,

      r.customer_key AS customer_key,
      c.first_name AS customer_first_name,
      c.last_name AS customer_last_name,
      c.email AS customer_email,
      c.active AS customer_active,
      
      r.staff_key AS staff_key,
      s.first_name AS staff_first_name,
      s.last_name AS staff_last_name,
      s.email AS staff_email,
      s.active AS staff_active,


      r.rental_date_key,
      od.day_name_of_week AS order_day_name_of_week,
      od.day_of_month AS order_day_of_month,
      od.weekday_weekend AS order_weekday_weekend,
      od.month_name AS order_month_name,
      od.calendar_quarter AS order_quarter,
      od.calendar_year AS order_year
      
  FROM rentals_silver_tempview AS r
  INNER JOIN sakila_dlh.dim_rentals AS re
  ON re.rental_key = r.rental_key
  INNER JOIN sakila_dlh.dim_customers AS c
  ON c.customer_key = r.customer_key
  INNER JOIN sakila_dlh.dim_staff AS s
  ON s.staff_key = r.staff_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS od
  ON od.date_key = r.rental_date_key
)

In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

Out[51]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f758c156460>

In [0]:
%sql
SELECT * FROM fact_rentals_silver

transaction_key,rental_key,rental_date,return_date,customer_key,customer_first_name,customer_last_name,customer_email,customer_active,staff_key,staff_first_name,staff_last_name,staff_email,staff_active,rental_date_key,order_day_name_of_week,order_day_of_month,order_weekday_weekend,order_month_name,order_quarter,order_year
1,1,2005-05-24,2005-05-26,130,CHARLOTTE,HUNTER,CHARLOTTE.HUNTER@sakilacustomer.org,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,,,,,,,
2,2,2005-05-24,2005-05-28,459,TOMMY,COLLAZO,TOMMY.COLLAZO@sakilacustomer.org,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,,,,,,,
3,3,2005-05-24,2005-06-01,408,MANUEL,MURRELL,MANUEL.MURRELL@sakilacustomer.org,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,,,,,,,
4,4,2005-05-24,2005-06-03,333,ANDREW,PURDY,ANDREW.PURDY@sakilacustomer.org,1,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,1,,,,,,,
5,5,2005-05-24,2005-06-02,222,DELORES,HANSEN,DELORES.HANSEN@sakilacustomer.org,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,,,,,,,
6,6,2005-05-24,2005-05-27,549,NELSON,CHRISTENSON,NELSON.CHRISTENSON@sakilacustomer.org,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,,,,,,,
7,7,2005-05-24,2005-05-29,269,CASSANDRA,WALTERS,CASSANDRA.WALTERS@sakilacustomer.org,1,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,1,,,,,,,
8,8,2005-05-24,2005-05-27,239,MINNIE,ROMERO,MINNIE.ROMERO@sakilacustomer.org,1,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,1,,,,,,,
9,9,2005-05-25,2005-05-28,126,ELLEN,SIMPSON,ELLEN.SIMPSON@sakilacustomer.org,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,,,,,,,
10,10,2005-05-25,2005-05-31,399,DANNY,ISOM,DANNY.ISOM@sakilacustomer.org,1,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,1,,,,,,,


In [0]:
%sql
DESCRIBE EXTENDED fact_rentals_silver

col_name,data_type,comment
transaction_key,bigint,
rental_key,bigint,
rental_date,string,
return_date,string,
customer_key,bigint,
customer_first_name,string,
customer_last_name,string,
customer_email,string,
customer_active,int,
staff_key,bigint,


##### 7.3. Gold Table: Perform Aggregations
This records all of the customers who made a rental with Staff Member 1

In [0]:
%sql
SELECT 
    c.customer_key, 
    c.first_name, 
    c.last_name, 
    c.email, 
    c.active,
    s.staff_key
FROM 
    sakila_dlh.dim_customers AS c
    JOIN sakila_dlh.dim_rentals AS r ON c.customer_key = r.customer_key 
    JOIN sakila_dlh.dim_staff s ON r.staff_key = s.staff_key 
WHERE 
    s.staff_key = 1


customer_key,first_name,last_name,email,active,staff_key
1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,1
3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,1,1
5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,1,1
6,JENNIFER,DAVIS,JENNIFER.DAVIS@sakilacustomer.org,1,1
7,MARIA,MILLER,MARIA.MILLER@sakilacustomer.org,1,1
8,SUSAN,WILSON,SUSAN.WILSON@sakilacustomer.org,1,1
9,MARGARET,MOORE,MARGARET.MOORE@sakilacustomer.org,1,1
11,LISA,ANDERSON,LISA.ANDERSON@sakilacustomer.org,1,1
12,NANCY,THOMAS,NANCY.THOMAS@sakilacustomer.org,1,1
14,BETTY,WHITE,BETTY.WHITE@sakilacustomer.org,1,1
