## DS-3002: Sample Capstone Project
This notebook demonstrates many of the software libraries and programming techniques required to fulfill the requirements of the final end-of-session capstone project for course **DS-3002: Data Systems** at the University of Virginia School of Data Science. The spirit of the project is to provide a capstone challenge that requires students to demonstrate a practical and functional understanding of each of the data systems and architectural principles covered throughout the session.

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Relational Databases Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Relational Databases Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

What's more, this project requires students to make effective decisions regarding whether to implement a Cloud-hosted, on-premises hosted, or hybrid architecture.

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure SQL Server Connection Information #####################
jdbc_hostname = "ds2002.mysql.database.azure.com"
jdbc_port = 1433
src_database = "sakila"

connection_properties = {
  "user" : "admin2",
  "password" : "mypass123!!",
  #"driver" : "org.mariadb.jdbc.Driver"
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "sakila"
atlas_database_name = "sakila"
atlas_user_name = "ani_ponugoti"
atlas_password = "PineApple"

# Data Files (JSON) Information ###############################
dst_database = "sakila"

base_dir = "dbfs:/FileStore/ds2002-capstone"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

output_bronze = f"{database_dir}/fact_sales_orders/bronze"
output_silver = f"{database_dir}/fact_sales_orders/silver"
output_gold   = f"{database_dir}/fact_sales_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_sales_orders", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[107]: True

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the Azure SQL database server.
# ######################################################################################################################
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure SQL Database'''
    jdbcUrl = f"jdbc:mysql://{host_name}:{port}/{db_name}"
    
    '''Invoke the spark.read.jdbc() function to query the database, and fill a Pandas DataFrame.'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)
    
    return dframe


# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.zvzvnbm.mongodb.net/{db_name}?retryWrites=true&w=majority"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.zvzvnbm.mongodb.net/{db_name}?retryWrites=true&w=majority"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure SQL Database
##### 1.1. Create a New Databricks Metadata Database, and then Create a New Table that Sources its Data from a View in an Azure SQL database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila
COMMENT "Capstone Project Database"
LOCATION "dbfs:/FileStore/ds2002-capstone/sakila"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone Project");

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002.mysql.database.azure.com:3306/sakilalt",
  dbtable "sakilalt.vdim_customer",
  user "admin2",
  password "mypass123!!"
)

In [0]:
%sql
USE DATABASE sakila;

CREATE TABLE IF NOT EXISTS sakila.dim_customer
COMMENT "Customers Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-capstone/sakila/dim_customer"
AS SELECT * FROM view_customer

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM sakila.dim_customer LIMIT 5

customer_key,store_id,first_name,last_name
1,1,MARY,SMITH
2,1,PATRICIA,JOHNSON
3,1,LINDA,WILLIAMS
4,2,BARBARA,JONES
5,1,ELIZABETH,BROWN


In [0]:
%sql
DESCRIBE EXTENDED sakila.dim_customer;

col_name,data_type,comment
customer_key,bigint,
store_id,bigint,
first_name,string,
last_name,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila,
Table,dim_customer,
Type,EXTERNAL,


##### 1.2. Create a New Table that Sources its Data from a Table in an Azure SQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002.mysql.database.azure.com:3306/sakilalt",
  dbtable "sakilalt.dim_date",
  user "admin2",
  password "mypass123!!"
)

In [0]:
%sql
USE DATABASE sakila;

CREATE TABLE IF NOT EXISTS sakila.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-capstone/sakila/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM sakila.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


In [0]:
%sql
DESCRIBE EXTENDED sakila.dim_date;

col_name,data_type,comment
date_key,int,
full_date,string,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/ds2002-capstone/source_data/batch/sakila_dim_customer.json,sakila_dim_customer.json,60317,1670971109000
dbfs:/FileStore/ds2002-capstone/source_data/batch/sakila_dim_payment.json,sakila_dim_payment.json,133950,1670971109000
dbfs:/FileStore/ds2002-capstone/source_data/batch/sakila_dim_rental.csv,sakila_dim_rental.csv,51753,1670971110000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/ds2002-capstone/source_data/batch'
json_files = {"payment" : 'sakila_dim_payment.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

Out[120]: <pymongo.results.InsertManyResult at 0x7fc4341e4100>

##### 2.3. Fetch Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_payment = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "sakila").option("collection", "payment").option("uri", "mongodb+srv://ani_ponugoti:PineApple@sakila.zvzvnbm.mongodb.net/test").load()
display(df_payment)

_id,amount,customer_id,payment_date,payment_key,rental_id
List(639a9131f085f2aefc76564d),2.99,1,2005-05-25 11:30:37,1,76
List(639a9131f085f2aefc76564e),0.99,1,2005-05-28 10:35:23,2,573
List(639a9131f085f2aefc76564f),5.99,1,2005-06-15 00:54:12,3,1185
List(639a9131f085f2aefc765650),0.99,1,2005-06-15 18:02:53,4,1422
List(639a9131f085f2aefc765651),9.99,1,2005-06-15 21:08:46,5,1476
List(639a9131f085f2aefc765652),4.99,1,2005-06-16 15:18:57,6,1725
List(639a9131f085f2aefc765653),4.99,1,2005-06-18 08:41:48,7,2308
List(639a9131f085f2aefc765654),0.99,1,2005-06-18 13:33:59,8,2363
List(639a9131f085f2aefc765655),3.99,1,2005-06-21 06:24:45,9,3284
List(639a9131f085f2aefc765656),5.99,1,2005-07-08 03:17:05,10,4526


In [0]:
%scala
df_payment.printSchema()

##### 2.4. Use the Spark DataFrame to Create a New Table in the Databricks (Adventure Works) Metadata Database

In [0]:
%scala
df_payment.write.format("delta").mode("overwrite").saveAsTable("sakila.dim_payment")

In [0]:
%sql
DESCRIBE EXTENDED sakila.dim_payment

col_name,data_type,comment
_id,struct,
amount,double,
customer_id,int,
payment_date,string,
payment_key,int,
rental_id,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila,


##### 2.5. Query the New Table in the Databricks Metadata Database

In [0]:
%sql
SELECT * FROM sakila.dim_payment LIMIT 5

_id,amount,customer_id,payment_date,payment_key,rental_id
List(639a9131f085f2aefc76564d),2.99,1,2005-05-25 11:30:37,1,76
List(639a9131f085f2aefc76564e),0.99,1,2005-05-28 10:35:23,2,573
List(639a9131f085f2aefc76564f),5.99,1,2005-06-15 00:54:12,3,1185
List(639a9131f085f2aefc765650),0.99,1,2005-06-15 18:02:53,4,1422
List(639a9131f085f2aefc765651),9.99,1,2005-06-15 21:08:46,5,1476


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
rental_csv = f"{batch_dir}/sakila_dim_rental.csv"

df_rental = spark.read.format('csv').options(header='true', inferSchema='true').load(rental_csv)
display(df_rental)

rental_key,rental_date,customer_id,return_date
1,2005-05-24T22:53:30.000+0000,130,2005-05-26T22:04:30.000+0000
10,2005-05-25T00:02:21.000+0000,399,2005-05-31T22:44:21.000+0000
100,2005-05-25T16:50:28.000+0000,208,2005-06-02T22:11:28.000+0000
1000,2005-05-31T00:25:56.000+0000,332,2005-06-08T19:42:56.000+0000
1001,2005-05-31T00:46:31.000+0000,64,2005-06-06T06:14:31.000+0000
101,2005-05-25T17:17:04.000+0000,468,2005-05-31T19:47:04.000+0000
102,2005-05-25T17:22:10.000+0000,343,2005-05-31T19:47:10.000+0000
103,2005-05-25T17:30:42.000+0000,384,2005-06-03T22:36:42.000+0000
104,2005-05-25T17:46:33.000+0000,310,2005-05-27T15:20:33.000+0000
105,2005-05-25T17:54:12.000+0000,108,2005-05-30T12:03:12.000+0000


In [0]:
df_rental.printSchema()

root
 |-- rental_key: integer (nullable = true)
 |-- rental_date: timestamp (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- return_date: timestamp (nullable = true)



In [0]:
df_rental.write.format("delta").mode("overwrite").saveAsTable("sakila.dim_rental")

In [0]:
%sql
DESCRIBE EXTENDED sakila.dim_rental;

col_name,data_type,comment
rental_key,int,
rental_date,timestamp,
customer_id,int,
return_date,timestamp,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila,
Table,dim_rental,
Type,MANAGED,


In [0]:
%sql
SELECT * FROM sakila.dim_rental LIMIT 5;

rental_key,rental_date,customer_id,return_date
1,2005-05-24T22:53:30.000+0000,130,2005-05-26T22:04:30.000+0000
10,2005-05-25T00:02:21.000+0000,399,2005-05-31T22:44:21.000+0000
100,2005-05-25T16:50:28.000+0000,208,2005-06-02T22:11:28.000+0000
1000,2005-05-31T00:25:56.000+0000,332,2005-06-08T19:42:56.000+0000
1001,2005-05-31T00:46:31.000+0000,64,2005-06-06T06:14:31.000+0000


##### Verify Dimension Tables

In [0]:
%sql
USE sakila;
SHOW TABLES

database,tableName,isTemporary
sakila,dim_customer,False
sakila,dim_date,False
sakila,dim_payment,False
sakila,dim_rental,False
,display_query_1,True
,display_query_2,True
,display_query_3,True
,display_query_4,True
,display_query_5,True
,fact_orders_silver_tempview,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "rental_key INT")
 .option("cloudFiles.schemaHints", "rental_date DATETIME")
 .option("cloudFiles.schemaHints", "customer_id BIGINT")
 .option("cloudFiles.schemaHints", "return_date DATETIME") 
 .option("cloudFiles.schemaHints", "payment_id BIGINT")
 .option("cloudFiles.schemaHints", "amount DOUBLE")
 .option("cloudFiles.schemaHints", "payment_date DATETIME")
 .option("cloudFiles.schemaHints", "store_id BIGINT")
 .option("cloudFiles.schemaHints", "first_name STRING") 
 .option("cloudFiles.schemaHints", "last_name STRING")
 .option("cloudFiles.schemaLocation", output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

amount,customer_id,first_name,last_name,payment_date,payment_id,rental_date,rental_key,return_date,store_id,_rescued_data,receipt_time,source_file
4.99,511,CHESTER,BENNER,2005-07-28 16:05:38,13770,2005-07-28 16:05:38,8026,2005-07-29 21:28:38,1,,2022-12-15T03:15:20.152+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders03.json
5.99,436,TROY,QUIGLEY,2005-07-28 16:09:57,11754,2005-07-28 16:09:57,8027,2005-08-05 18:17:57,1,,2022-12-15T03:15:20.152+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders03.json
8.99,137,RHONDA,KENNEDY,2005-07-28 16:11:15,3696,2005-07-28 16:11:15,8028,2005-08-05 15:11:15,2,,2022-12-15T03:15:20.152+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders03.json
6.99,61,KATHERINE,RIVERA,2005-07-28 16:11:21,1675,2005-07-28 16:11:21,8029,2005-08-05 13:52:21,2,,2022-12-15T03:15:20.152+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders03.json
4.99,30,MELISSA,KING,2005-07-28 16:12:53,833,2005-07-28 16:12:53,8030,2005-07-31 17:15:53,1,,2022-12-15T03:15:20.152+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders03.json
0.99,40,AMANDA,CARTER,2005-07-28 16:15:49,1107,2005-07-28 16:15:49,8031,2005-08-02 18:27:49,2,,2022-12-15T03:15:20.152+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders03.json
4.99,230,JOY,GEORGE,2005-07-28 16:17:00,6203,2005-07-28 16:17:00,8032,2005-07-29 13:32:00,2,,2022-12-15T03:15:20.152+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders03.json
4.99,1,MARY,SMITH,2005-07-28 16:18:23,17,2005-07-28 16:18:23,8033,2005-07-30 17:56:23,1,,2022-12-15T03:15:20.152+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders03.json
4.99,502,BRETT,CORNWELL,2005-07-28 16:20:26,13530,2005-07-28 16:20:26,8034,2005-08-04 19:11:26,1,,2022-12-15T03:15:20.152+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders03.json
6.99,14,BETTY,WHITE,2005-07-28 16:23:01,370,2005-07-28 16:23:01,8035,2005-08-05 10:52:01,2,,2022-12-15T03:15:20.152+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders03.json


In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

Out[132]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fc43419fc10>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

amount,customer_id,first_name,last_name,payment_date,payment_id,rental_date,rental_key,return_date,store_id,_rescued_data,receipt_time,source_file
2.99,130,CHARLOTTE,HUNTER,2005-05-24 22:53:30,3504,2005-05-24 22:53:30,1,2005-05-26 22:04:30,1,,2022-12-15T03:15:24.001+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders01.json
2.99,459,TOMMY,COLLAZO,2005-05-24 22:54:33,12377,2005-05-24 22:54:33,2,2005-05-28 19:40:33,1,,2022-12-15T03:15:24.001+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders01.json
3.99,408,MANUEL,MURRELL,2005-05-24 23:03:39,11032,2005-05-24 23:03:39,3,2005-06-01 22:12:39,1,,2022-12-15T03:15:24.001+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders01.json
4.99,333,ANDREW,PURDY,2005-05-24 23:04:41,8987,2005-05-24 23:04:41,4,2005-06-03 01:43:41,2,,2022-12-15T03:15:24.001+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders01.json
6.99,222,DELORES,HANSEN,2005-05-24 23:05:21,6003,2005-05-24 23:05:21,5,2005-06-02 04:33:21,2,,2022-12-15T03:15:24.001+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders01.json
0.99,549,NELSON,CHRISTENSON,2005-05-24 23:08:07,14728,2005-05-24 23:08:07,6,2005-05-27 01:32:07,1,,2022-12-15T03:15:24.001+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders01.json
1.99,269,CASSANDRA,WALTERS,2005-05-24 23:11:53,7274,2005-05-24 23:11:53,7,2005-05-29 20:34:53,1,,2022-12-15T03:15:24.001+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders01.json
4.99,239,MINNIE,ROMERO,2005-05-24 23:31:46,6440,2005-05-24 23:31:46,8,2005-05-27 23:33:46,2,,2022-12-15T03:15:24.001+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders01.json
4.99,126,ELLEN,SIMPSON,2005-05-25 00:00:40,3386,2005-05-25 00:00:40,9,2005-05-28 00:22:40,1,,2022-12-15T03:15:24.001+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders01.json
5.99,399,DANNY,ISOM,2005-05-25 00:02:21,10785,2005-05-25 00:02:21,10,2005-05-31 22:44:21,1,,2022-12-15T03:15:24.001+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders01.json


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
amount,double,
customer_id,bigint,
first_name,string,
last_name,string,
payment_date,string,
payment_id,bigint,
rental_date,string,
rental_key,bigint,
return_date,string,
store_id,bigint,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT t.rental_key
    , rd.month_name AS rental_month
    , rd.day_name_of_week AS rental_day_name
    , rd.day_of_month AS rental_day
    , rd.calendar_year AS rental_year
    , pd.month_name AS payment_month
    , pd.day_name_of_week AS payment_day_name
    , pd.day_of_month AS payment_day
    , pd.calendar_year AS payment_year
    , red.month_name AS return_month
    , red.day_name_of_week AS return_day_name
    , red.day_of_month AS return_day
    , red.calendar_year AS return_year
    , t.store_id
    , t.first_name
    , t.last_name
    , t.customer_id
    , t.amount
    , t.payment_id
  FROM orders_silver_tempview t
  INNER JOIN sakila.dim_rental r
  ON t.rental_key = r.rental_key
  INNER JOIN sakila.dim_payment p
  ON t.payment_id = p.payment_key
  INNER JOIN sakila.dim_customer c
  ON t.customer_id = c.customer_key
  INNER JOIN sakila.dim_date rd
  ON CAST(t.rental_date AS DATE) = rd.full_date
  INNER JOIN sakila.dim_date pd
  ON CAST(t.payment_date AS DATE) = pd.full_date
  INNER JOIN sakila.dim_date red
  ON CAST(t.return_date AS DATE) = red.full_date)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

Out[137]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fc434149c10>

In [0]:
%sql
SELECT * FROM fact_orders_silver

rental_key,rental_month,rental_day_name,rental_day,rental_year,payment_month,payment_day_name,payment_day,payment_year,return_month,return_day_name,return_day,return_year,store_id,first_name,last_name,customer_id,amount,payment_id


In [0]:
%sql
DESCRIBE EXTENDED sakila.fact_orders_silver

col_name,data_type,comment
rental_key,bigint,
rental_month,string,
rental_day_name,string,
rental_day,int,
rental_year,int,
payment_month,string,
payment_day_name,string,
payment_day,int,
payment_year,int,
return_month,string,


##### 6.4. Gold Table: Perform Aggregations

In [0]:
%sql
SELECT customer_id
  , last_name
  , first_name
  , rental_month
  , SUM(amount) AS total_payment
FROM sakila.fact_orders_silver
GROUP BY customer_id, last_name, first_name, rental_month
ORDER BY total_payment DESC

customer_id,last_name,first_name,rental_month,total_payment
19,MARTINEZ,RUTH,May,26.940000000000005
14,WHITE,BETTY,May,20.95
22,RODRIGUEZ,LAURA,May,18.97
7,MILLER,MARIA,May,14.96
18,GARCIA,CAROL,May,12.97
23,LEWIS,SARAH,May,11.98
20,ROBINSON,SHARON,May,11.97
35,GREEN,VIRGINIA,May,10.98
17,THOMPSON,DONNA,May,10.97
32,LOPEZ,AMY,May,9.98


In [0]:
%sql
SELECT customer_id
  , last_name
  , first_name
  , rental_year
  , COUNT(rental_key) AS total_rentals
FROM sakila.fact_orders_silver
GROUP BY customer_id, last_name, first_name, rental_year
ORDER BY total_rentals DESC

customer_id,last_name,first_name,rental_year,total_rentals
19,MARTINEZ,RUTH,2005,6
14,WHITE,BETTY,2005,5
7,MILLER,MARIA,2005,4
6,DAVIS,JENNIFER,2005,3
17,THOMPSON,DONNA,2005,3
21,CLARK,MICHELLE,2005,3
22,RODRIGUEZ,LAURA,2005,3
18,GARCIA,CAROL,2005,3
16,MARTIN,SANDRA,2005,3
20,ROBINSON,SHARON,2005,3
