## Final Project: Captstone

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "upm6ce-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "classicmodels_dw"

connection_properties = {
  "user" : "upm6ce",
  "password" : "D@t@B3t@!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.p4wvwnt"
atlas_database_name = "classicmodels_dw"
atlas_user_name = "upm6ce"
atlas_password = "C0nn3ctF0ur"

# Data Files (JSON) Information ###############################
dst_database = "classicmodels_dlh"

base_dir = "dbfs:/FileStore/final_project_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/classicmodels"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

False

#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS classicmodels_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS classicmodels_dlh
COMMENT "DS-2002 Final Project Database"
LOCATION "dbfs:/FileStore/final_project_data/classicmodels_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://upm6ce-mysql.mysql.database.azure.com:3306/classicmodels_dw", 
  dbtable "dim_date",
  user "upm6ce",    
  password "D@t@B3t@!"  
)

In [0]:
%sql
USE DATABASE classicmodels_dlh;

CREATE OR REPLACE TABLE classicmodels_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/final_project_data/classicmodels_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,int,
day_name_of_week,varchar(10),
day_of_month,int,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://upm6ce-mysql.mysql.database.azure.com:3306/classicmodels_dw", 
  dbtable "dim_products",
  user "upm6ce",    
  password "D@t@B3t@!"  
)

In [0]:
%sql
USE DATABASE classicmodels_dlh;

CREATE OR REPLACE TABLE classicmodels_dlh.dim_product
COMMENT "Product Dimension Table"
LOCATION "dbfs:/FileStore/final_project_data/classicmodels_dlh/dim_product"
AS SELECT * FROM view_product

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_product;

col_name,data_type,comment
product_key,bigint,
productCode,varchar(65535),
productName,varchar(65535),
productLine,varchar(65535),
productVendor,varchar(65535),
productDescription,varchar(65535),
quantityInStock,bigint,
buyPrice,double,
MSRP,double,
,,


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_product LIMIT 5

product_key,productCode,productName,productLine,productVendor,productDescription,quantityInStock,buyPrice,MSRP
1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,Min Lin Diecast,"This replica features working kickstand, front suspension, gear-shift lever, footbrake lever, drive chain, wheels and steering. All parts are particularly delicate due to their precise scale and require special care and attention.",7933,48.81,95.7
2,S10_1949,1952 Alpine Renault 1300,Classic Cars,Classic Metal Creations,Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,7305,98.58,214.3
3,S10_2016,1996 Moto Guzzi 1100i,Motorcycles,Highway 66 Mini Classics,"Official Moto Guzzi logos and insignias, saddle bags located on side of motorcycle, detailed engine, working steering, working suspension, two leather seats, luggage rack, dual exhaust pipes, small saddle bag located on handle bars, two-tone paint with chrome accents, superior die-cast detail , rotating wheels , working kick stand, diecast metal with plastic parts and baked enamel finish.",6625,68.99,118.94
4,S10_4698,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,Red Start Diecast,"Model features, official Harley Davidson logos and insignias, detachable rear wheelie bar, heavy diecast metal with resin parts, authentic multi-color tampo-printed graphics, separate engine drive belts, free-turning front fork, rotating tires and rear racing slick, certificate of authenticity, detailed engine, display stand , precision diecast replica, baked enamel finish, 1:10 scale model, removable fender, seat and tank cover piece for displaying the superior detail of the v-twin engine",5582,91.02,193.66
5,S10_4757,1972 Alfa Romeo GTA,Classic Cars,Motor City Art Classics,Features include: Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,3252,85.68,136.0


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/final_project_data/classicmodels/batch'

path,name,size,modificationTime
dbfs:/FileStore/final_project_data/classicmodels/batch/Classicmodels_DimCustomers.json,Classicmodels_DimCustomers.json,46698,1715329615000
dbfs:/FileStore/final_project_data/classicmodels/batch/Classicmodels_DimEmployees.csv,Classicmodels_DimEmployees.csv,1661,1715329615000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/final_project_data/classicmodels/batch'
json_files = {"customers" : 'Classicmodels_DimCustomers.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f33a4a0b7c0>

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "upm6ce"
val pwd = "C0nn3ctF0ur"
val clusterName = "cluster0.p4wvwnt"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "classicmodels_dw")
.option("collection", "customers").load()
.select("customer_key","customerNumber", "customerName", "contactLastName", "contactFirstName", "phone", "addressLine", "city", "state", "postalCode", "country", "salesRepEmployeeNumber", "creditLimit")

display(df_customer)

customer_key,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
1,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",Nantes,,44000,France,1370.0,21000
2,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,Las Vegas,NV,83030,USA,1166.0,71800
3,114,"Australian Collectors, Co.",Ferguson,Peter,03 9520 4555,636 St Kilda Road,Melbourne,Victoria,3004,Australia,1611.0,117300
4,119,La Rochelle Gifts,Labrune,Janine,40.67.8555,"67, rue des Cinquante Otages",Nantes,,44000,France,1370.0,118200
5,121,Baane Mini Imports,Bergulfsen,Jonas,07-98 9555,Erling Skakkes gate 78,Stavern,,4110,Norway,1504.0,81700
6,124,Mini Gifts Distributors Ltd.,Nelson,Susan,4155551450,5677 Strong St.,San Rafael,CA,97562,USA,1165.0,210500
7,125,Havel & Zbyszek Co,Piestrzeniewicz,Zbyszek,(26) 642-7555,ul. Filtrowa 68,Warszawa,,01-012,Poland,,0
8,128,"Blauer See Auto, Co.",Keitel,Roland,+49 69 66 90 2555,Lyonerstr. 34,Frankfurt,,60528,Germany,1504.0,59700
9,129,Mini Wheels Co.,Murphy,Julie,6505555787,5557 North Pendale Street,San Francisco,CA,94217,USA,1165.0,64600
10,131,Land of Toys Inc.,Lee,Kwai,2125557818,897 Long Airport Avenue,NYC,NY,10022,USA,1323.0,114900


In [0]:
%scala
df_customer.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (classicmodels_dlh)

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("classicmodels_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_customer

col_name,data_type,comment
customer_key,int,
customerNumber,int,
customerName,string,
contactLastName,string,
contactFirstName,string,
phone,string,
addressLine,string,
city,string,
state,string,
postalCode,string,


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_customer LIMIT 5

customer_key,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
1,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",Nantes,,44000,France,1370,21000
2,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,Las Vegas,NV,83030,USA,1166,71800
3,114,"Australian Collectors, Co.",Ferguson,Peter,03 9520 4555,636 St Kilda Road,Melbourne,Victoria,3004,Australia,1611,117300
4,119,La Rochelle Gifts,Labrune,Janine,40.67.8555,"67, rue des Cinquante Otages",Nantes,,44000,France,1370,118200
5,121,Baane Mini Imports,Bergulfsen,Jonas,07-98 9555,Erling Skakkes gate 78,Stavern,,4110,Norway,1504,81700


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
employee_csv = f"{batch_dir}/Classicmodels_DimEmployees.csv"

df_employee = spark.read.format('csv').options(header='true', inferSchema='true').load(employee_csv)
display(df_employee)

employee_key,employeeNumber,lastName,firstName,email,reportsTo,jobTitle
1,1002,Murphy,Diane,dmurphy@classicmodelcars.com,,President
2,1056,Patterson,Mary,mpatterso@classicmodelcars.com,1002.0,VP Sales
3,1076,Firrelli,Jeff,jfirrelli@classicmodelcars.com,1002.0,VP Marketing
4,1088,Patterson,William,wpatterson@classicmodelcars.com,1056.0,Sales Manager (APAC)
5,1102,Bondur,Gerard,gbondur@classicmodelcars.com,1056.0,Sale Manager (EMEA)
6,1143,Bow,Anthony,abow@classicmodelcars.com,1056.0,Sales Manager (NA)
7,1165,Jennings,Leslie,ljennings@classicmodelcars.com,1143.0,Sales Rep
8,1166,Thompson,Leslie,lthompson@classicmodelcars.com,1143.0,Sales Rep
9,1188,Firrelli,Julie,jfirrelli@classicmodelcars.com,1143.0,Sales Rep
10,1216,Patterson,Steve,spatterson@classicmodelcars.com,1143.0,Sales Rep


In [0]:
df_employee.printSchema()

root
 |-- employee_key: integer (nullable = true)
 |-- employeeNumber: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- email: string (nullable = true)
 |-- reportsTo: string (nullable = true)
 |-- jobTitle: string (nullable = true)



In [0]:
df_employee.write.format("delta").mode("overwrite").saveAsTable("classicmodels_dlh.dim_employee")

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_employee;

col_name,data_type,comment
employee_key,int,
employeeNumber,int,
lastName,string,
firstName,string,
email,string,
reportsTo,string,
jobTitle,string,
,,
# Delta Statistics Columns,,
Column Names,"email, reportsTo, lastName, firstName, employee_key, employeeNumber, jobTitle",


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_employee LIMIT 5;

employee_key,employeeNumber,lastName,firstName,email,reportsTo,jobTitle
1,1002,Murphy,Diane,dmurphy@classicmodelcars.com,,President
2,1056,Patterson,Mary,mpatterso@classicmodelcars.com,1002.0,VP Sales
3,1076,Firrelli,Jeff,jfirrelli@classicmodelcars.com,1002.0,VP Marketing
4,1088,Patterson,William,wpatterson@classicmodelcars.com,1056.0,Sales Manager (APAC)
5,1102,Bondur,Gerard,gbondur@classicmodelcars.com,1056.0,Sale Manager (EMEA)


##### Verify Dimension Tables

In [0]:
%sql
USE classicmodels_dlh;
SHOW TABLES

database,tableName,isTemporary
classicmodels_dlh,dim_customer,False
classicmodels_dlh,dim_date,False
classicmodels_dlh,dim_employee,False
classicmodels_dlh,dim_product,False
,view_date,True
,view_product,True


### Section III: Integrate Reference Data with Real-Time Data
#### 1.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 1.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 #.option("cloudFiles.schemaHints", "order_key BIGINT")
 #.option("cloudFiles.schemaHints", "employee_key BIGINT")
 #.option("cloudFiles.schemaHints", "customer_key BIGINT") 
 #.option("cloudFiles.schemaHints", "product_key BIGINT")
 #.option("cloudFiles.schemaHints", "shipper_key DECIMAL")
 #.option("cloudFiles.schemaHints", "order_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "paid_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "shipped_date_key DECIMAL") 
 #.option("cloudFiles.schemaHints", "quantity DECIMAL")
 #.option("cloudFiles.schemaHints", "unit_price DECIMAL")
 #.option("cloudFiles.schemaHints", "discount DECIMAL")
 #.option("cloudFiles.schemaHints", "shipping_fee DECIMAL")
 #.option("cloudFiles.schemaHints", "taxes DECIMAL")
 #.option("cloudFiles.schemaHints", "tax_rate DECIMAL")
 #.option("cloudFiles.schemaHints", "payment_type STRING")
 #.option("cloudFiles.schemaHints", "order_status STRING")
 #.option("cloudFiles.schemaHints", "order_details_status STRING")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

customer_key,fact_order_key,orderNumber,order_date_key,priceEach,product_key,quantityOrdered,required_date_key,shipped_date_key,status,_rescued_data,receipt_time,source_file
83,1998,10260,20040616,121.57,39,32,20040622,,Cancelled,,2024-05-11T00:33:04.076Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders03.json
66,1999,10221,20040218,133.86,39,33,20040226,20040219.0,Shipped,,2024-05-11T00:33:04.076Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders03.json
66,2000,10273,20040721,117.47,39,40,20040728,20040722.0,Shipped,,2024-05-11T00:33:04.076Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders03.json
47,2001,10232,20040320,133.86,39,22,20040330,20040325.0,Shipped,,2024-05-11T00:33:04.076Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders03.json
94,2002,10372,20050126,131.13,39,28,20050205,20050128.0,Shipped,,2024-05-11T00:33:04.076Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders03.json
85,2003,10414,20050506,128.39,39,41,20050513,,On Hold,,2024-05-11T00:33:04.076Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders03.json
50,2004,10293,20040909,110.64,39,24,20040918,20040914.0,Shipped,,2024-05-11T00:33:04.076Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders03.json
5,2005,10325,20041105,44.37,88,38,20041113,20041108.0,Shipped,,2024-05-11T00:33:04.076Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders03.json
11,2006,10104,20030131,51.95,88,35,20030209,20030201.0,Shipped,,2024-05-11T00:33:04.076Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders03.json
11,2007,10246,20040505,45.45,88,35,20040513,20040506.0,Shipped,,2024-05-11T00:33:04.076Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders03.json


In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f3388ba2dd0>

##### 1.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

customer_key,fact_order_key,orderNumber,order_date_key,priceEach,product_key,quantityOrdered,required_date_key,shipped_date_key,status,_rescued_data,receipt_time,source_file
86,1,10100,20030106,136.0,23,30,20030113,20030110.0,Shipped,,2024-05-11T00:33:19.207Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders01.json
11,2,10379,20050210,156.4,23,39,20050218,20050211.0,Shipped,,2024-05-11T00:33:19.207Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders01.json
57,3,10173,20031105,168.3,23,24,20031115,20031109.0,Shipped,,2024-05-11T00:33:19.207Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders01.json
118,4,10331,20041117,154.7,23,44,20041123,20041123.0,Shipped,,2024-05-11T00:33:19.207Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders01.json
30,5,10110,20030318,153.0,23,42,20030324,20030320.0,Shipped,,2024-05-11T00:33:19.207Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders01.json
6,6,10182,20031112,159.8,23,44,20031121,20031118.0,Shipped,,2024-05-11T00:33:19.207Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders01.json
6,7,10312,20041021,146.2,23,48,20041027,20041023.0,Shipped,,2024-05-11T00:33:19.207Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders01.json
80,8,10344,20041125,168.3,23,45,20041202,20041129.0,Shipped,,2024-05-11T00:33:19.207Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders01.json
2,9,10124,20030521,153.0,23,21,20030529,20030525.0,Shipped,,2024-05-11T00:33:19.207Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders01.json
107,10,10214,20040126,166.6,23,30,20040204,20040129.0,Shipped,,2024-05-11T00:33:19.207Z,dbfs:/FileStore/final_project_data/classicmodels/stream/orders/Classicmodels_FactOrders01.json


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
customer_key,bigint,
fact_order_key,bigint,
orderNumber,bigint,
order_date_key,bigint,
priceEach,double,
product_key,bigint,
quantityOrdered,bigint,
required_date_key,bigint,
shipped_date_key,bigint,
status,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.fact_order_key,
      o.orderNumber,
      o.customer_key,
      c.customerNumber,
      c.customerName,
      c.contactLastName,
      c.contactFirstName,
      o.product_key,
      p.productCode,
      p.productName, 
      p.productLine,
      p.buyPrice AS product_buy_price,
      o.order_date_key,
      od.day_name_of_week AS order_day_name_of_week,
      od.day_of_month AS order_day_of_month,
      od.weekday_weekend AS order_weekday_weekend,
      od.month_name AS order_month_name,
      od.calendar_quarter AS order_quarter,
      od.calendar_year AS order_year,
      o.required_date_key,
      rd.day_name_of_week AS required_day_name_of_week,
      rd.day_of_month AS required_day_of_month,
      rd.weekday_weekend AS required_weekday_weekend,
      rd.month_name AS required_month_name,
      rd.calendar_quarter AS required_quarter,
      rd.calendar_year AS required_year,
      o.shipped_date_key,
      sd.day_name_of_week AS shipped_day_name_of_week,
      sd.day_of_month AS shipped_day_of_month,
      sd.weekday_weekend AS shipped_weekday_weekend,
      sd.month_name AS shipped_month_name,
      sd.calendar_quarter AS shipped_quarter,
      sd.calendar_year AS shipped_year,
      o.priceEach,
      o.quantityOrdered,
      o.status
  FROM orders_silver_tempview AS o
  INNER JOIN classicmodels_dlh.dim_customer AS c
  ON c.customer_key = o.customer_key
  INNER JOIN classicmodels_dlh.dim_product AS p
  ON p.product_key = o.product_key
  INNER JOIN classicmodels_dlh.dim_date AS od
  ON od.date_key = o.order_date_key
  INNER JOIN classicmodels_dlh.dim_date AS rd
  ON rd.date_key = o.required_date_key
  INNER JOIN classicmodels_dlh.dim_date AS sd
  ON sd.date_key = o.shipped_date_key
)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f3388bbd8a0>

In [0]:
%sql
SELECT * FROM fact_orders_silver

fact_order_key,orderNumber,customer_key,customerNumber,customerName,contactLastName,contactFirstName,product_key,productCode,productName,productLine,product_buy_price,order_date_key,order_day_name_of_week,order_day_of_month,order_weekday_weekend,order_month_name,order_quarter,order_year,required_date_key,required_day_name_of_week,required_day_of_month,required_weekday_weekend,required_month_name,required_quarter,required_year,shipped_date_key,shipped_day_name_of_week,shipped_day_of_month,shipped_weekday_weekend,shipped_month_name,shipped_quarter,shipped_year,priceEach,quantityOrdered,status


In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.fact_orders_silver

col_name,data_type,comment
fact_order_key,bigint,
orderNumber,bigint,
customer_key,bigint,
customerNumber,int,
customerName,string,
contactLastName,string,
contactFirstName,string,
product_key,bigint,
productCode,varchar(65535),
productName,varchar(65535),


##### 1.3. Gold Table: Perform Aggregations
Quantity ordered for each customer, per product line and per order quarter, including CustomerNumber, CustomerName, OrderQuarter, productLine, and TotalQuantityOrdered

In [0]:
%sql
CREATE OR REPLACE TABLE classicmodels_dlh.fact_orders_gold AS (
  SELECT customer_key AS CustomerID
    , customerName AS Customer
    , order_quarter AS OrderQuarter
    , productLine
    , COUNT(quantityOrdered) AS TotalQuantityOrdered
  FROM classicmodels_dlh.fact_orders_silver
  GROUP BY CustomerID, CustomerName, OrderQuarter, ProductLine
  ORDER BY TotalQuantityOrdered DESC);

SELECT * FROM classicmodels_dlh.fact_orders_gold;

CustomerID,Customer,OrderQuarter,productLine,TotalQuantityOrdered
11,Euro+ Shopping Channel,1,Classic Cars,50
11,Euro+ Shopping Channel,4,Classic Cars,35
6,Mini Gifts Distributors Ltd.,4,Vintage Cars,30
6,Mini Gifts Distributors Ltd.,1,Classic Cars,29
6,Mini Gifts Distributors Ltd.,3,Classic Cars,28
53,"Toms SpezialitÃ¤ten, Ltd",4,Classic Cars,23
6,Mini Gifts Distributors Ltd.,1,Trucks and Buses,22
43,Heintze Collectables,4,Classic Cars,20
16,Muscle Machine Inc,4,Classic Cars,19
57,Rovelli Gifts,4,Vintage Cars,18


#### 2.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/final_project_data/