## Final Project - Sai Singamsetty (sbs3ja)
Please refer to this github repository for this project: https://github.com/SaiSetty/DS2002Final_Sai
There are a few **necessary steps to take before running this notebook file**.
1. From the github repository linked above, first import the classic_data_mart.sql file to MySQL Workbench.
2. Then upload the final_data/classic_retail folder in the repository to the databricks DBFS.
3. The notebook file that I submit to canvas has all my credentials filled out, but the notebook file in the github repository doesn't (so those need to be filled in to run again). *However, both notebooks files are after execution of the functional code and will contain outputs!*

Summary:
In this project I am using Azure Databricks to populate a dimensional data lakehouse that represents a business process (ordering cars). My initial existing database that I'm using  to populate the Data Lakehouse is *classic_data_mart*, which is my final database from my midterm project. 
My 4 dimension tables for this project include dim_date, dim_customer, dim_employee, and dim_product. My fact table is based off of fact_orders.
- Relational Database Management Systems: I get my dim_date and dim_customer tables.
- NoSQL *(Not Only SQL)* Systems (MongoDB): I get my dim_product table.
- File System *(Data Lake)* Source Systems: I get my dim_employee table.
- The bronze, silver, and gold architecture deals with the fact_orders table, data of differing granularity.

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [None]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [None]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "sbs3ja-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "classic_data_mart"

connection_properties = {
  "user" : "sbs3ja",
  "password" : "", #Insert password here
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "sandbox.qfqddoo"
atlas_database_name = "classic_data_mart"
atlas_user_name = "sbs3ja"
atlas_password = "" #Insert password here

# Data Files (JSON) Information ###############################
dst_database = "classicmodels_dlh"

base_dir = "dbfs:/FileStore/final_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/classic_retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### 3.0. Define Global Functions

In [None]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 

#### 1.0. Fetch Reference Data From an Azure MySQL Database (dim_date and dim_customer)
##### 1.1. Create a New Databricks Metadata Database.

In [None]:
%sql
DROP DATABASE IF EXISTS classicmodels_dlh CASCADE;

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS classicmodels_dlh
COMMENT "DS-2002 Final Project"
LOCATION "dbfs:/FileStore/final_data/classicmodels_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://sbs3ja-mysql.mysql.database.azure.com:3306/classic_data_mart", --server name
  dbtable "dim_date",
  user "sbs3ja",    --Replace with your User Name
  password ""  --Replace with you password
)

In [None]:
%sql
USE DATABASE classicmodels_dlh;

CREATE OR REPLACE TABLE classicmodels_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/final_data/classicmodels_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [None]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,int,
day_name_of_week,varchar(10),
day_of_month,int,
day_of_year,int,
weekday_weekend,varchar(10),


In [None]:
%sql
SELECT * FROM classicmodels_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20030101,2003-01-01,2003/01/01,01/01/2003,01/01/2003,4,Wednesday,1,1,Weekday,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030102,2003-01-02,2003/01/02,01/02/2003,02/01/2003,5,Thursday,2,2,Weekday,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030103,2003-01-03,2003/01/03,01/03/2003,03/01/2003,6,Friday,3,3,Weekday,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030104,2003-01-04,2003/01/04,01/04/2003,04/01/2003,7,Saturday,4,4,Weekend,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030105,2003-01-05,2003/01/05,01/05/2003,05/01/2003,1,Sunday,5,5,Weekend,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3


##### 1.3. Create a New Table that Sources Customer Dimension Data from an Azure MySQL database.

In [None]:
%sql
-- Create a Temporary View named "view_customer" that extracts data from my MySQL classicmodels database.
CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://sbs3ja-mysql.mysql.database.azure.com:3306/classic_data_mart", --server name
  dbtable "dim_customers",
  user "sbs3ja",    --Replace with your User Name
  password ""  --Replace with you password
)

In [None]:
%sql
USE DATABASE classicmodels_dlh;

-- Create a new table named "classicmodels_dlh.dim_customer" using data from the view named "view_customer"
CREATE OR REPLACE TABLE classicmodels_dlh.dim_customer
COMMENT "Customers Dimension Table"
LOCATION "dbfs:/FileStore/final_data/classicmodels_dlh/dim_customer"
AS SELECT * FROM view_customer

num_affected_rows,num_inserted_rows


In [None]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_customer;

col_name,data_type,comment
customer_key,bigint,
customer_id,bigint,
customerName,varchar(65535),
contactLastName,varchar(65535),
contactFirstName,varchar(65535),
phone,varchar(65535),
city,varchar(65535),
postalCode,varchar(65535),
country,varchar(65535),
salesRepEmployeeNumber,double,


In [None]:
%sql
SELECT * FROM classicmodels_dlh.dim_customer LIMIT 5

customer_key,customer_id,customerName,contactLastName,contactFirstName,phone,city,postalCode,country,salesRepEmployeeNumber,creditLimit
1,103,Atelier graphique,Schmitt,Carine,40.32.2555,Nantes,44000,France,1370.0,21000.0
2,112,Signal Gift Stores,King,Jean,7025551838,Las Vegas,83030,USA,1166.0,71800.0
3,114,"Australian Collectors, Co.",Ferguson,Peter,03 9520 4555,Melbourne,3004,Australia,1611.0,117300.0
4,119,La Rochelle Gifts,Labrune,Janine,40.67.8555,Nantes,44000,France,1370.0,118200.0
5,121,Baane Mini Imports,Bergulfsen,Jonas,07-98 9555,Stavern,4110,Norway,1504.0,81700.0


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database (dim_product)
##### 2.1. View the Data Files on the Databricks File System

In [None]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/final_data/classic_retail/batch'

path,name,size,modificationTime
dbfs:/FileStore/final_data/classic_retail/batch/Classicmodels_DimEmployees.csv,Classicmodels_DimEmployees.csv,1857,1715189282000
dbfs:/FileStore/final_data/classic_retail/batch/Classicmodels_DimProducts.json,Classicmodels_DimProducts.json,30213,1715187649000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [None]:
source_dir = '/dbfs/FileStore/final_data/classic_retail/batch'
json_files = {"products" : 'Classicmodels_DimProducts.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7fc7389bac40>

##### 2.3.1. Fetch Product Dimension Data from the New MongoDB Collection

In [None]:
%scala
import com.mongodb.spark._

val userName = "sbs3ja"
val pwd = "" 
val clusterName = "sandbox.qfqddoo"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [None]:
%scala

val df_product = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "classic_data_mart")
.option("collection", "products").load()
.select("product_key", "product_id", "productName", "productLine", "productScale", "productVendor", "quantityInStock", "buyPrice", "MSRP")

display(df_product)

product_key,product_id,productName,productLine,productScale,productVendor,quantityInStock,buyPrice,MSRP
1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,7933,48.81,95.7
2,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,7305,98.58,214.3
3,S10_2016,1996 Moto Guzzi 1100i,Motorcycles,1:10,Highway 66 Mini Classics,6625,68.99,118.94
4,S10_4698,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,1:10,Red Start Diecast,5582,91.02,193.66
5,S10_4757,1972 Alfa Romeo GTA,Classic Cars,1:10,Motor City Art Classics,3252,85.68,136.0
6,S10_4962,1962 LanciaA Delta 16V,Classic Cars,1:10,Second Gear Diecast,6791,103.42,147.74
7,S12_1099,1968 Ford Mustang,Classic Cars,1:12,Autoart Studio Design,68,95.34,194.57
8,S12_1108,2001 Ferrari Enzo,Classic Cars,1:12,Second Gear Diecast,3619,95.59,207.8
9,S12_1666,1958 Setra Bus,Trucks and Buses,1:12,Welly Diecast Productions,1579,77.9,136.67
10,S12_2823,2002 Suzuki XREO,Motorcycles,1:12,Unimax Art Galleries,9997,66.27,150.62


In [None]:
%scala
df_product.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Product Dimension Table in the Databricks Metadata Database (classicmodels_dlh)

In [None]:
%scala
df_product.write.format("delta").mode("overwrite").saveAsTable("classicmodels_dlh.dim_product")

In [None]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_product

col_name,data_type,comment
product_key,int,
product_id,string,
productName,string,
productLine,string,
productScale,string,
productVendor,string,
quantityInStock,int,
buyPrice,double,
MSRP,double,
,,


In [None]:
%sql
SELECT * FROM classicmodels_dlh.dim_product LIMIT 5

product_key,product_id,productName,productLine,productScale,productVendor,quantityInStock,buyPrice,MSRP
1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,7933,48.81,95.7
2,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,7305,98.58,214.3
3,S10_2016,1996 Moto Guzzi 1100i,Motorcycles,1:10,Highway 66 Mini Classics,6625,68.99,118.94
4,S10_4698,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,1:10,Red Start Diecast,5582,91.02,193.66
5,S10_4757,1972 Alfa Romeo GTA,Classic Cars,1:10,Motor City Art Classics,3252,85.68,136.0


#### 3.0. Fetch Data from a File System (dim_employee)
##### 3.1. Use PySpark to Read dim_employee data From a CSV File

In [None]:
employee_csv = f"{batch_dir}/Classicmodels_DimEmployees.csv"

df_employee = spark.read.format('csv').options(header='true', inferSchema='true').load(employee_csv)
display(df_employee)

employee_key,employee_id,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
1,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,President
2,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales
3,1076,Firrelli,Jeff,x9273,jfirrelli@classicmodelcars.com,1,1002.0,VP Marketing
4,1088,Patterson,William,x4871,wpatterson@classicmodelcars.com,6,1056.0,Sales Manager (APAC)
5,1102,Bondur,Gerard,x5408,gbondur@classicmodelcars.com,4,1056.0,Sale Manager (EMEA)
6,1143,Bow,Anthony,x5428,abow@classicmodelcars.com,1,1056.0,Sales Manager (NA)
7,1165,Jennings,Leslie,x3291,ljennings@classicmodelcars.com,1,1143.0,Sales Rep
8,1166,Thompson,Leslie,x4065,lthompson@classicmodelcars.com,1,1143.0,Sales Rep
9,1188,Firrelli,Julie,x2173,jfirrelli@classicmodelcars.com,2,1143.0,Sales Rep
10,1216,Patterson,Steve,x4334,spatterson@classicmodelcars.com,2,1143.0,Sales Rep


In [None]:
df_employee.printSchema()

root
 |-- employee_key: integer (nullable = true)
 |-- employee_id: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- extension: string (nullable = true)
 |-- email: string (nullable = true)
 |-- officeCode: integer (nullable = true)
 |-- reportsTo: string (nullable = true)
 |-- jobTitle: string (nullable = true)



In [None]:
df_employee.write.format("delta").mode("overwrite").saveAsTable("classicmodels_dlh.dim_employee")

In [None]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_employee;

col_name,data_type,comment
employee_key,int,
employee_id,int,
lastName,string,
firstName,string,
extension,string,
email,string,
officeCode,int,
reportsTo,string,
jobTitle,string,
,,


In [None]:
%sql
SELECT * FROM classicmodels_dlh.dim_employee LIMIT 5;

employee_key,employee_id,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
1,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,President
2,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales
3,1076,Firrelli,Jeff,x9273,jfirrelli@classicmodelcars.com,1,1002.0,VP Marketing
4,1088,Patterson,William,x4871,wpatterson@classicmodelcars.com,6,1056.0,Sales Manager (APAC)
5,1102,Bondur,Gerard,x5408,gbondur@classicmodelcars.com,4,1056.0,Sale Manager (EMEA)


##### Verify Dimension Tables

In [None]:
%sql
USE classicmodels_dlh;
SHOW TABLES

database,tableName,isTemporary
classicmodels_dlh,dim_customer,False
classicmodels_dlh,dim_date,False
classicmodels_dlh,dim_employee,False
classicmodels_dlh,dim_product,False
,view_customer,True
,view_date,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 


##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [None]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 #.option("cloudFiles.schemaHints", "order_id BIGINT")
 #.option("cloudFiles.schemaHints", "order_details_id BIGINT")
 #.option("cloudFiles.schemaHints", "customer_key BIGINT")
 #.option("cloudFiles.schemaHints", "order_date_key DECIMAL") 
 #.option("cloudFiles.schemaHints", "required_date_key DECIMAL") 
 #.option("cloudFiles.schemaHints", "shipped_date_key DECIMAL") 
 #.option("cloudFiles.schemaHints", "quantityOrdered DECIMAL")
 #.option("cloudFiles.schemaHints", "priceEach DECIMAL")
 #.option("cloudFiles.schemaHints", "status STRING")
 #.option("cloudFiles.schemaHints", "product_key BIGINT")
 #.option("cloudFiles.schemaHints", "employee_key BIGINT")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [None]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [None]:
%sql
SELECT * FROM orders_bronze_tempview

customer_key,employee_key,fact_order_key,order_date_key,order_details_id,order_id,priceEach,product_key,quantityOrdered,required_date_key,shipped_date_key,status,_rescued_data,receipt_time,source_file
3,18,1000,20040220,1165,10223,80.39,1,37,20040229,20040224.0,Shipped,,2024-05-10T20:53:39.817Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders02.json
3,18,1001,20040220,1166,10223,110.61,3,47,20040229,20040224.0,Shipped,,2024-05-10T20:53:39.817Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders02.json
3,18,1002,20040220,1167,10223,189.79,4,49,20040229,20040224.0,Shipped,,2024-05-10T20:53:39.817Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders02.json
3,18,1003,20040220,1168,10223,67.58,31,47,20040229,20040224.0,Shipped,,2024-05-10T20:53:39.817Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders02.json
3,18,1004,20040220,1169,10223,58.75,32,28,20040229,20040224.0,Shipped,,2024-05-10T20:53:39.817Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders02.json
3,18,1005,20040220,1170,10223,104.81,58,32,20040229,20040224.0,Shipped,,2024-05-10T20:53:39.817Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders02.json
3,18,1006,20040220,1171,10223,87.54,60,34,20040229,20040224.0,Shipped,,2024-05-10T20:53:39.817Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders02.json
3,18,1007,20040220,1172,10223,60.94,62,38,20040229,20040224.0,Shipped,,2024-05-10T20:53:39.817Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders02.json
3,18,1008,20040220,1173,10223,68.1,83,23,20040229,20040224.0,Shipped,,2024-05-10T20:53:39.817Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders02.json
3,18,1009,20040220,1174,10223,90.9,86,21,20040229,20040224.0,Shipped,,2024-05-10T20:53:39.817Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders02.json


In [None]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7fc738174250>

##### 6.2. Silver Table: Include Reference Data

In [None]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [None]:
%sql
SELECT * FROM orders_silver_tempview

customer_key,employee_key,fact_order_key,order_date_key,order_details_id,order_id,priceEach,product_key,quantityOrdered,required_date_key,shipped_date_key,status,_rescued_data,receipt_time,source_file
86,10,1,20030106,1,10100,136.0,23,30,20030113,20030110.0,Shipped,,2024-05-10T20:54:17.054Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders01.json
86,10,2,20030106,2,10100,55.09,27,50,20030113,20030110.0,Shipped,,2024-05-10T20:54:17.054Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders01.json
86,10,3,20030106,3,10100,75.46,50,22,20030113,20030110.0,Shipped,,2024-05-10T20:54:17.054Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders01.json
86,10,4,20030106,4,10100,35.29,80,49,20030113,20030110.0,Shipped,,2024-05-10T20:54:17.054Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders01.json
86,10,5,20031120,864,10192,99.04,16,27,20031129,20031125.0,Shipped,,2024-05-10T20:54:17.054Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders01.json
86,10,6,20031120,865,10192,140.12,18,22,20031129,20031125.0,Shipped,,2024-05-10T20:54:17.054Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders01.json
86,10,7,20031120,866,10192,100.8,21,29,20031129,20031125.0,Shipped,,2024-05-10T20:54:17.054Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders01.json
86,10,8,20031120,867,10192,70.84,24,45,20031129,20031125.0,Shipped,,2024-05-10T20:54:17.054Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders01.json
86,10,9,20031120,868,10192,128.03,25,47,20031129,20031125.0,Shipped,,2024-05-10T20:54:17.054Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders01.json
86,10,10,20031120,869,10192,110.88,34,38,20031129,20031125.0,Shipped,,2024-05-10T20:54:17.054Z,dbfs:/FileStore/final_data/classic_retail/stream/orders/Classicmodels_FactOrders01.json


In [None]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
customer_key,bigint,
employee_key,bigint,
fact_order_key,bigint,
order_date_key,bigint,
order_details_id,bigint,
order_id,bigint,
priceEach,double,
product_key,bigint,
quantityOrdered,bigint,
required_date_key,bigint,


In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.fact_order_key,
      o.order_id,
      o.order_details_id,

      o.employee_key,
      e.lastName AS employee_last_name,
      e.firstName AS employee_first_name,
      e.jobTitle AS employee_job_title,
      e.email AS employee_email,

      o.customer_key,
      c.customerName AS customer_name,
      c.contactLastName AS contact_last_name,
      c.contactFirstName AS contact_first_name,
      c.city AS customer_city,
      c.country AS customer_country,
      c.creditLimit AS customer_credit_limit,

      o.product_key,
      p.productName,
      p.productLine,
      p.productScale,
      p.productVendor,
      p.quantityInStock AS product_quantity_in_stock,
      p.buyPrice AS product_buy_price,
      p.MSRP AS product_MSRP,

      o.order_date_key,
      od.day_name_of_week AS order_day_name_of_week,
      od.day_of_month AS order_day_of_month,
      od.weekday_weekend AS order_weekday_weekend,
      od.month_name AS order_month_name,
      od.calendar_quarter AS order_quarter,
      od.calendar_year AS order_year,

      o.required_date_key,
      rd.day_name_of_week AS required_day_name_of_week,
      rd.day_of_month AS required_day_of_month,
      rd.weekday_weekend AS required_weekday_weekend,
      rd.month_name AS required_month_name,
      rd.calendar_quarter AS required_calendar_quarter,
      rd.calendar_year AS required_calendar_year,

      o.shipped_date_key,
      sd.day_name_of_week AS shipped_day_name_of_week,
      sd.day_of_month AS shipped_day_of_month,
      sd.weekday_weekend AS shipped_weekday_weekend,
      sd.month_name AS shipped_month_name,
      sd.calendar_quarter AS shipped_calendar_quarter,
      sd.calendar_year AS shipped_calendar_year,

      o.quantityOrdered,
      o.priceEach,
      o.status
  FROM orders_silver_tempview AS o
  INNER JOIN classicmodels_dlh.dim_employee AS e
  ON e.employee_key = o.employee_key
  INNER JOIN classicmodels_dlh.dim_customer AS c
  ON c.customer_key = o.customer_key
  INNER JOIN classicmodels_dlh.dim_product AS p
  ON p.product_key = o.product_key
  LEFT OUTER JOIN classicmodels_dlh.dim_date AS od
  ON od.date_key = o.order_date_key
  LEFT OUTER JOIN classicmodels_dlh.dim_date AS rd
  ON rd.date_key = o.required_date_key
  LEFT OUTER JOIN classicmodels_dlh.dim_date AS sd
  ON sd.date_key = o.shipped_date_key
)

In [None]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7fc714f8a0e0>

In [None]:
%sql
SELECT * FROM fact_orders_silver

fact_order_key,order_id,order_details_id,employee_key,employee_last_name,employee_first_name,employee_job_title,employee_email,customer_key,customer_name,contact_last_name,contact_first_name,customer_city,customer_country,customer_credit_limit,product_key,productName,productLine,productScale,productVendor,product_quantity_in_stock,product_buy_price,product_MSRP,order_date_key,order_day_name_of_week,order_day_of_month,order_weekday_weekend,order_month_name,order_quarter,order_year,required_date_key,required_day_name_of_week,required_day_of_month,required_weekday_weekend,required_month_name,required_calendar_quarter,required_calendar_year,shipped_date_key,shipped_day_name_of_week,shipped_day_of_month,shipped_weekday_weekend,shipped_month_name,shipped_calendar_quarter,shipped_calendar_year,quantityOrdered,priceEach,status
1,10100,1,10,Patterson,Steve,Sales Rep,spatterson@classicmodelcars.com,86,Online Diecast Creations Co.,Young,Dorothy,Nashua,USA,114200.0,23,1917 Grand Touring Sedan,Vintage Cars,1:18,Welly Diecast Productions,2724,86.7,170.0,20030106,Monday,6,Weekday,January,1,2003,20030113,Monday,13,Weekday,January,1,2003,20030110.0,Friday,10.0,Weekday,January,1.0,2003.0,30,136.0,Shipped
2,10100,2,10,Patterson,Steve,Sales Rep,spatterson@classicmodelcars.com,86,Online Diecast Creations Co.,Young,Dorothy,Nashua,USA,114200.0,27,1911 Ford Town Car,Vintage Cars,1:18,Motor City Art Classics,540,33.3,60.54,20030106,Monday,6,Weekday,January,1,2003,20030113,Monday,13,Weekday,January,1,2003,20030110.0,Friday,10.0,Weekday,January,1.0,2003.0,50,55.09,Shipped
3,10100,3,10,Patterson,Steve,Sales Rep,spatterson@classicmodelcars.com,86,Online Diecast Creations Co.,Young,Dorothy,Nashua,USA,114200.0,50,1932 Alfa Romeo 8C2300 Spider Sport,Vintage Cars,1:18,Exoto Designs,6553,43.26,92.03,20030106,Monday,6,Weekday,January,1,2003,20030113,Monday,13,Weekday,January,1,2003,20030110.0,Friday,10.0,Weekday,January,1.0,2003.0,22,75.46,Shipped
4,10100,4,10,Patterson,Steve,Sales Rep,spatterson@classicmodelcars.com,86,Online Diecast Creations Co.,Young,Dorothy,Nashua,USA,114200.0,80,1936 Mercedes Benz 500k Roadster,Vintage Cars,1:24,Red Start Diecast,2081,21.75,41.03,20030106,Monday,6,Weekday,January,1,2003,20030113,Monday,13,Weekday,January,1,2003,20030110.0,Friday,10.0,Weekday,January,1.0,2003.0,49,35.29,Shipped
5,10192,864,10,Patterson,Steve,Sales Rep,spatterson@classicmodelcars.com,86,Online Diecast Creations Co.,Young,Dorothy,Nashua,USA,114200.0,16,1969 Dodge Charger,Classic Cars,1:12,Welly Diecast Productions,7323,58.73,115.16,20031120,Thursday,20,Weekday,November,4,2003,20031129,Saturday,29,Weekend,November,4,2003,20031125.0,Tuesday,25.0,Weekday,November,4.0,2003.0,27,99.04,Shipped
6,10192,865,10,Patterson,Steve,Sales Rep,spatterson@classicmodelcars.com,86,Online Diecast Creations Co.,Young,Dorothy,Nashua,USA,114200.0,18,1993 Mazda RX-7,Classic Cars,1:18,Highway 66 Mini Classics,3975,83.51,141.54,20031120,Thursday,20,Weekday,November,4,2003,20031129,Saturday,29,Weekend,November,4,2003,20031125.0,Tuesday,25.0,Weekday,November,4.0,2003.0,22,140.12,Shipped
7,10192,866,10,Patterson,Steve,Sales Rep,spatterson@classicmodelcars.com,86,Online Diecast Creations Co.,Young,Dorothy,Nashua,USA,114200.0,21,1965 Aston Martin DB5,Classic Cars,1:18,Classic Metal Creations,9042,65.96,124.44,20031120,Thursday,20,Weekday,November,4,2003,20031129,Saturday,29,Weekend,November,4,2003,20031125.0,Tuesday,25.0,Weekday,November,4.0,2003.0,29,100.8,Shipped
8,10192,867,10,Patterson,Steve,Sales Rep,spatterson@classicmodelcars.com,86,Online Diecast Creations Co.,Young,Dorothy,Nashua,USA,114200.0,24,1948 Porsche 356-A Roadster,Classic Cars,1:18,Gearbox Collectibles,8826,53.9,77.0,20031120,Thursday,20,Weekday,November,4,2003,20031129,Saturday,29,Weekend,November,4,2003,20031125.0,Tuesday,25.0,Weekday,November,4.0,2003.0,45,70.84,Shipped
9,10192,868,10,Patterson,Steve,Sales Rep,spatterson@classicmodelcars.com,86,Online Diecast Creations Co.,Young,Dorothy,Nashua,USA,114200.0,25,1995 Honda Civic,Classic Cars,1:18,Min Lin Diecast,9772,93.89,142.25,20031120,Thursday,20,Weekday,November,4,2003,20031129,Saturday,29,Weekend,November,4,2003,20031125.0,Tuesday,25.0,Weekday,November,4.0,2003.0,47,128.03,Shipped
10,10192,869,10,Patterson,Steve,Sales Rep,spatterson@classicmodelcars.com,86,Online Diecast Creations Co.,Young,Dorothy,Nashua,USA,114200.0,34,1999 Indy 500 Monte Carlo SS,Classic Cars,1:18,Red Start Diecast,8164,56.76,132.0,20031120,Thursday,20,Weekday,November,4,2003,20031129,Saturday,29,Weekend,November,4,2003,20031125.0,Tuesday,25.0,Weekday,November,4.0,2003.0,38,110.88,Shipped


Databricks data profile. Run in Databricks to view.

In [None]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.fact_orders_silver

col_name,data_type,comment
fact_order_key,bigint,
order_id,bigint,
order_details_id,bigint,
employee_key,bigint,
employee_last_name,string,
employee_first_name,string,
employee_job_title,string,
employee_email,string,
customer_key,bigint,
customer_name,varchar(65535),


##### 6.3. Gold Table: Perform Aggregations
I created 3 new Gold tables using the CTAS approach. They are explained below.
1. The first table includes the number of products sold by an employee each Month, along with the Employees' ID, First & Last Name, and the Month in which the order was sold. Ordered by employee ID and product count.
2. The second table shows the number of product orders placed by a customer total, the customer name and the customer id. It is ordered by product count.
3. The third table shows the number of orders placed for a certain product type by a country, the country, and the product type/line. It is ordered by the country and product type.

In [None]:
%sql
CREATE OR REPLACE TABLE classicmodels_dlh.fact_monthly_sales_by_employee_gold AS (
  SELECT employee_key AS EmployeeID
    , employee_first_name AS Employee_FirstName
    , employee_last_name AS Employee_LastName
    , order_month_name AS OrderMonth
    , COUNT(product_key) AS ProductCount
  FROM classicmodels_dlh.fact_orders_silver
  GROUP BY EmployeeID, Employee_LastName, Employee_FirstName, OrderMonth
  ORDER BY EmployeeID, ProductCount DESC);

SELECT * FROM classicmodels_dlh.fact_monthly_sales_by_employee_gold;

EmployeeID,Employee_FirstName,Employee_LastName,OrderMonth,ProductCount
7,Leslie,Jennings,October,63
7,Leslie,Jennings,March,48
7,Leslie,Jennings,July,39
7,Leslie,Jennings,February,32
7,Leslie,Jennings,November,29
7,Leslie,Jennings,August,29
7,Leslie,Jennings,January,21
7,Leslie,Jennings,April,21
7,Leslie,Jennings,December,17
7,Leslie,Jennings,May,16


In [None]:
%sql
CREATE OR REPLACE TABLE classicmodels_dlh.fact_product_orders_by_customer_gold AS (
  SELECT customer_key AS CustomerID
    , customer_name
    , COUNT(product_key) AS ProductCount
  FROM classicmodels_dlh.fact_orders_silver
  GROUP BY CustomerID, customer_name
  ORDER BY ProductCount DESC);

SELECT * FROM classicmodels_dlh.fact_product_orders_by_customer_gold;

CustomerID,customer_name,ProductCount
11,Euro+ Shopping Channel,259
6,Mini Gifts Distributors Ltd.,180
3,"Australian Collectors, Co.",55
4,La Rochelle Gifts,53
30,"AV Stores, Co.",51
10,Land of Toys Inc.,49
16,Muscle Machine Inc,48
57,Rovelli Gifts,48
122,Kelly's Gift Shop,48
58,Souveniers And Things Co.,46


In [None]:
%sql
CREATE OR REPLACE TABLE classicmodels_dlh.fact_product_orders_by_type_country_gold AS (
  SELECT customer_country AS CustomerCountry
    , productLine AS ProductType
    , COUNT(product_key) AS ProductCount
  FROM classicmodels_dlh.fact_orders_silver
  GROUP BY CustomerCountry, ProductType
  ORDER BY CustomerCountry, ProductCount DESC);

SELECT * FROM classicmodels_dlh.fact_product_orders_by_type_country_gold;

CustomerCountry,ProductType,ProductCount
Australia,Vintage Cars,58
Australia,Classic Cars,53
Australia,Motorcycles,26
Australia,Planes,25
Australia,Trucks and Buses,20
Australia,Ships,2
Australia,Trains,1
Austria,Classic Cars,25
Austria,Vintage Cars,10
Austria,Planes,6


#### 9.0. Clean up the File System

In [None]:
%fs rm -r /FileStore/final_data