## DS 2002: Final Project
#### Eva Terry, bzt4em

The data from this notebook comes from my fact and dimension tables created in my midterm project. It comes from the mysqlsampledatabase.sql which is in the class GitHub and creates the classicmodels schema. This creates classic_dw, which can also be created by running dump_classic_dw.sql. 

In addition to the date dimension, I have a dimension for customers, products, order details, and orders. 
### Section I: Setup

#### Import Required Libraries

In [0]:
# code is from Lab 6
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "bzt4em-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "classic_dw"

connection_properties = {
  "user" : "bzt4em",
  "password" : "Rollins1995!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.ix8khjp"
atlas_database_name = "classic_dw"
atlas_user_name = "bzt4em"
atlas_password = "nYECGm3XXrAK5jqy"

# Data Files (JSON) Information ###############################
dst_database = "classic_dlh"

base_dir = "dbfs:/FileStore/final_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### Define Global Functions (directly from Lab 6)

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"

    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### Fetch Reference Data From an Azure MySQL Database
##### Creating a New Databricks Metadata Database.

In [0]:
%sql
-- allows the user to start from scratch
DROP DATABASE IF EXISTS classic_dlh CASCADE;

In [0]:
%sql
-- creates the classic_dlh database, it is empty after execution of this statement
CREATE DATABASE IF NOT EXISTS classic_dlh
COMMENT "DS-2002 Final Project Database"
LOCATION "dbfs:/FileStore/final_data/classic_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

##### Creating a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 
This satisfies the requirement of populating from a relational databased like Azure MySQL. This is also satisfying the requirement for a date dimension. 

In [0]:
%sql
-- creating a temporary view for dim_date, which is loaded from Azure MySQL
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://bzt4em-mysql.mysql.database.azure.com:3306/classic_dw", --Replace with your Server Name
  dbtable "dim_date",
  user "bzt4em",    --Replace with your User Name
  password "Rollins1995!"  --Replace with you password
)

In [0]:
%sql
-- creating the date dimension in my classic Data Lakehouse
USE DATABASE classic_dlh;

CREATE OR REPLACE TABLE classic_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/final_data/classic_dlh/dim_date"
AS SELECT * FROM view_date;

num_affected_rows,num_inserted_rows


In [0]:
%sql
-- showing the column names and types for dim_date
DESCRIBE EXTENDED classic_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,int,
day_name_of_week,varchar(10),
day_of_month,int,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
-- showing the first 5 instances of dim_date
SELECT * FROM classic_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### Creating a New Table that Sources Product Dimension Data from an Azure MySQL database.
This also satisfies the requirement for data originating from a relational database (Azure MySQL). It also adds the first additional dimension table (1/3 required).

In [0]:
%sql
-- Creating a Temporary View named "view_product" that extracts data from my classic_dw database.
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://bzt4em-mysql.mysql.database.azure.com:3306/classic_dw", --Replace with your Server Name
  dbtable "dim_products",
  user "bzt4em",    --Replace with your User Name
  password "Rollins1995!"  --Replace with you password
)

In [0]:
%sql
USE DATABASE classic_dlh;

-- Create a new table named "classic_dlh.dim_product" using data from the view named "view_product"
CREATE OR REPLACE TABLE classic_dlh.dim_product
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/final_data/classic_dlh/dim_products"
AS SELECT * FROM view_product

num_affected_rows,num_inserted_rows


In [0]:
%sql
-- displaying the column names and types for dim_product
DESCRIBE EXTENDED classic_dlh.dim_product;

col_name,data_type,comment
productCode,varchar(15),
productName,varchar(70),
productLine,varchar(50),
productScale,varchar(10),
productVendor,varchar(50),
productDescription,varchar(65535),
quantityInStock,int,
buyPrice,"decimal(10,2)",
MSRP,"decimal(10,2)",
,,


In [0]:
%sql
-- displaying the first 5 instances of dim_product
SELECT * FROM classic_dlh.dim_product LIMIT 5

productCode,productName,productLine,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,"This replica features working kickstand, front suspension, gear-shift lever, footbrake lever, drive chain, wheels and steering. All parts are particularly delicate due to their precise scale and require special care and attention.",7933,48.81,95.7
S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,7305,98.58,214.3
S10_2016,1996 Moto Guzzi 1100i,Motorcycles,1:10,Highway 66 Mini Classics,"Official Moto Guzzi logos and insignias, saddle bags located on side of motorcycle, detailed engine, working steering, working suspension, two leather seats, luggage rack, dual exhaust pipes, small saddle bag located on handle bars, two-tone paint with chrome accents, superior die-cast detail , rotating wheels , working kick stand, diecast metal with plastic parts and baked enamel finish.",6625,68.99,118.94
S10_4698,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,1:10,Red Start Diecast,"Model features, official Harley Davidson logos and insignias, detachable rear wheelie bar, heavy diecast metal with resin parts, authentic multi-color tampo-printed graphics, separate engine drive belts, free-turning front fork, rotating tires and rear racing slick, certificate of authenticity, detailed engine, display stand , precision diecast replica, baked enamel finish, 1:10 scale model, removable fender, seat and tank cover piece for displaying the superior detail of the v-twin engine",5582,91.02,193.66
S10_4757,1972 Alfa Romeo GTA,Classic Cars,1:10,Motor City Art Classics,Features include: Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,3252,85.68,136.0


#### Fetch Reference Data from a MongoDB Atlas Database
##### View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  
# showing the files in the batch directory of my file store

path,name,size,modificationTime
dbfs:/FileStore/final_data/retail/batch/classic_dim_customers.json,classic_dim_customers.json,49906,1714681900000
dbfs:/FileStore/final_data/retail/batch/classic_dim_order_details.csv,classic_dim_order_details.csv,30527,1714681900000
dbfs:/FileStore/final_data/retail/batch/classic_dim_orders.csv,classic_dim_orders.csv,25308,1714681900000


##### Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/final_data/retail/batch'
json_files = {"customers" : 'classic_dim_customers.json'}


set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f54e4724300>

##### Fetch Customer Dimension Data from the New MongoDB Collection

This satisfies the requirement to read data from a NoSQL database like MongoDB Atlas.

In [0]:
%scala
import com.mongodb.spark._
// setting the variables using my Atlas credentials and cluster name
val userName = "bzt4em"
val pwd = "nYECGm3XXrAK5jqy"
val clusterName = "cluster0.ix8khjp"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala
// reading from MongoDB and selecting columns
val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "classic_dw")
.option("collection", "customers").load()
.select("customerKey","customerNumber","customerName","contactLastName","contactFirstName","phone","addressLine1","addressLine2","city","state","postalCode","country")
// showing the results
display(df_customer)

customerKey,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country
1,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France
2,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA
3,114,"Australian Collectors, Co.",Ferguson,Peter,03 9520 4555,636 St Kilda Road,Level 3,Melbourne,Victoria,3004,Australia
4,119,La Rochelle Gifts,Labrune,Janine,40.67.8555,"67, rue des Cinquante Otages",,Nantes,,44000,France
5,121,Baane Mini Imports,Bergulfsen,Jonas,07-98 9555,Erling Skakkes gate 78,,Stavern,,4110,Norway
6,124,Mini Gifts Distributors Ltd.,Nelson,Susan,4155551450,5677 Strong St.,,San Rafael,CA,97562,USA
7,125,Havel & Zbyszek Co,Piestrzeniewicz,Zbyszek,(26) 642-7555,ul. Filtrowa 68,,Warszawa,,01-012,Poland
8,128,"Blauer See Auto, Co.",Keitel,Roland,+49 69 66 90 2555,Lyonerstr. 34,,Frankfurt,,60528,Germany
9,129,Mini Wheels Co.,Murphy,Julie,6505555787,5557 North Pendale Street,,San Francisco,CA,94217,USA
10,131,Land of Toys Inc.,Lee,Kwai,2125557818,897 Long Airport Avenue,,NYC,NY,10022,USA


In [0]:
%scala
// printing the schema, which shows the columns and basic information about them
df_customer.printSchema()

##### Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (classic_dlh)

 This satisfies the requirement for a second additional dimension table dim_products (2/3 required).

In [0]:
%scala
// writing a delta table for the customer dimension
df_customer.write.format("delta").mode("overwrite").saveAsTable("classic_dlh.dim_customer")

In [0]:
%sql
-- showing the customer column names and types
DESCRIBE EXTENDED classic_dlh.dim_customer

col_name,data_type,comment
customerKey,int,
customerNumber,int,
customerName,string,
contactLastName,string,
contactFirstName,string,
phone,string,
addressLine1,string,
addressLine2,string,
city,string,
state,string,


In [0]:
%sql
-- showing the first 5 customer instances
SELECT * FROM classic_dlh.dim_customer LIMIT 5

customerKey,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country
1,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France
2,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA
3,114,"Australian Collectors, Co.",Ferguson,Peter,03 9520 4555,636 St Kilda Road,Level 3,Melbourne,Victoria,3004,Australia
4,119,La Rochelle Gifts,Labrune,Janine,40.67.8555,"67, rue des Cinquante Otages",,Nantes,,44000,France
5,121,Baane Mini Imports,Bergulfsen,Jonas,07-98 9555,Erling Skakkes gate 78,,Stavern,,4110,Norway


#### Fetch Data from a File System
##### Use PySpark to Read From a CSV File

This satisfies the final data source requirement: data originating from files on a cloud-based file system like the DBFS.

In [0]:
# reading in the order details information using spark
classic_dim_order_details_csv = f"{batch_dir}/classic_dim_order_details.csv"

df_order_details = spark.read.format('csv').options(header='true', inferSchema='true').load(classic_dim_order_details_csv)
display(df_order_details)

orderDetailsKey,orderNumber,productCode,quantityOrdered,priceEach,orderLineNumber
1,10100,S18_1749,30,136.0,3
2,10100,S18_2248,50,55.09,2
3,10100,S18_4409,22,75.46,4
4,10100,S24_3969,49,35.29,1
5,10101,S18_2325,25,108.06,4
6,10101,S18_2795,26,167.06,1
7,10101,S24_1937,45,32.53,3
8,10101,S24_2022,46,44.35,2
9,10102,S18_1342,39,95.55,2
10,10102,S18_1367,41,43.13,1


In [0]:
# showing the details of the dataframe
df_order_details.printSchema()

root
 |-- orderDetailsKey: integer (nullable = true)
 |-- orderNumber: integer (nullable = true)
 |-- productCode: string (nullable = true)
 |-- quantityOrdered: integer (nullable = true)
 |-- priceEach: double (nullable = true)
 |-- orderLineNumber: integer (nullable = true)



This satisfies the requirement for a third additional dimension table dim_order_details (3/3 required).

In [0]:
# creating the order_details dimension table
df_order_details.write.format("delta").mode("overwrite").saveAsTable("classic_dlh.dim_order_details")

In [0]:
%sql
-- showing the column names and types
DESCRIBE EXTENDED classic_dlh.dim_order_details;

col_name,data_type,comment
orderDetailsKey,int,
orderNumber,int,
productCode,string,
quantityOrdered,int,
priceEach,double,
orderLineNumber,int,
,,
# Delta Statistics Columns,,
Column Names,"orderNumber, priceEach, orderLineNumber, orderDetailsKey, productCode, quantityOrdered",
Column Selection Method,first-32,


In [0]:
%sql
-- showing the first 5 instances of order_details
SELECT * FROM classic_dlh.dim_order_details LIMIT 5;

orderDetailsKey,orderNumber,productCode,quantityOrdered,priceEach,orderLineNumber
1,10100,S18_1749,30,136.0,3
2,10100,S18_2248,50,55.09,2
3,10100,S18_4409,22,75.46,4
4,10100,S24_3969,49,35.29,1
5,10101,S18_2325,25,108.06,4


##### Using PySpark to Read Orders Dimension Data from CSV File

In [0]:
# repeating the same process above for the orders dimension
orders_csv = f"{batch_dir}/classic_dim_orders.csv"

df_orders = spark.read.format('csv').options(header='true', inferSchema='true').load(orders_csv)
display(df_orders)

orderKey,orderNumber,requiredDate,shippedDate,status,comments,customerNumber,orderDateKey
1,10100,2003-01-13,2003-01-10,Shipped,,363,20030106
2,10101,2003-01-18,2003-01-11,Shipped,Check on availability.,128,20030109
3,10102,2003-01-18,2003-01-14,Shipped,,181,20030110
4,10103,2003-02-07,2003-02-02,Shipped,,121,20030129
5,10104,2003-02-09,2003-02-01,Shipped,,141,20030131
6,10105,2003-02-21,2003-02-12,Shipped,,145,20030211
7,10106,2003-02-24,2003-02-21,Shipped,,278,20030217
8,10107,2003-03-03,2003-02-26,Shipped,Difficult to negotiate with customer. We need more marketing materials,131,20030224
9,10108,2003-03-12,2003-03-08,Shipped,,385,20030303
10,10109,2003-03-19,2003-03-11,Shipped,Customer requested that FedEx Ground is used for this shipping,486,20030310


In [0]:
# printing the details for the dataframe of orders
df_orders.printSchema()

root
 |-- orderKey: integer (nullable = true)
 |-- orderNumber: integer (nullable = true)
 |-- requiredDate: date (nullable = true)
 |-- shippedDate: string (nullable = true)
 |-- status: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- customerNumber: integer (nullable = true)
 |-- orderDateKey: integer (nullable = true)



Adding a fourth dimension table (4/3 required).



In [0]:
# creating the order dimension delta table
df_orders.write.format("delta").mode("overwrite").saveAsTable("classic_dlh.dim_orders")

In [0]:
%sql
-- showing the column names and typs for dim_orders
DESCRIBE EXTENDED classic_dlh.dim_orders;

col_name,data_type,comment
orderKey,int,
orderNumber,int,
requiredDate,date,
shippedDate,string,
status,string,
comments,string,
customerNumber,int,
orderDateKey,int,
,,
# Delta Statistics Columns,,


In [0]:
%sql
-- showing the first 5 instances of the orders dimension
SELECT * FROM classic_dlh.dim_orders LIMIT 5;

orderKey,orderNumber,requiredDate,shippedDate,status,comments,customerNumber,orderDateKey
1,10100,2003-01-13,2003-01-10,Shipped,,363,20030106
2,10101,2003-01-18,2003-01-11,Shipped,Check on availability.,128,20030109
3,10102,2003-01-18,2003-01-14,Shipped,,181,20030110
4,10103,2003-02-07,2003-02-02,Shipped,,121,20030129
5,10104,2003-02-09,2003-02-01,Shipped,,141,20030131


##### Verify Dimension Tables

Displays the dimension tables for classic_dlh which are dim_customer, dim_date, dim_order_details, dim_orders, and dim_product. This satisfies the design requirements for the date dimension, 3+ additional dimension tables, and data from 3 sources. 

In [0]:
%sql
USE classic_dlh;
SHOW TABLES

database,tableName,isTemporary
classic_dlh,dim_customer,False
classic_dlh,dim_date,False
classic_dlh,dim_order_details,False
classic_dlh,dim_orders,False
classic_dlh,dim_product,False
,display_query_1,True
,display_query_2,True
,display_query_3,True
,display_query_4,True
,orders_bronze_tempview,True


### Section III: Integrate Reference Data with Real-Time Data
#### Using AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### Bronze Table: Process 'Raw' JSON Data
I exported my fact table into 3 ranges of roughly equal size into 3 separate json files which are in the orders_stream_dir. This demonstrates integrating streaming data since the data is from separate JSON files

This satisfies the requirement of a fact table that models the business process. This fact table is read in as created by my midterm project, and is later altered and combined with dimension tables to create fact_orders_silver.

In [0]:
# using spark to read in my JSON files with fact order information
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
-- Adding Metadata for Traceability
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
-- displaying the results
SELECT * FROM orders_bronze_tempview

customerKey,customerName,factOrdersKey,orderDateKey,orderDetailsKey,orderKey,orderNumber,orderStatus,orderTotalPrice,productName,quantityOrdered,requiredDateKey,shippedDateKey,unitPrice,_rescued_data,receipt_time,source_file
86,Online Diecast Creations Co.,1,20030106,1,1,10100,Shipped,4080.0,1917 Grand Touring Sedan,30,20030113,20030110.0,136.0,,2024-05-06T16:25:08.446Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
57,Rovelli Gifts,2,20031105,661,74,10173,Shipped,4039.2,1917 Grand Touring Sedan,24,20031115,20031109.0,168.3,,2024-05-06T16:25:08.446Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
30,"AV Stores, Co.",3,20030318,104,11,10110,Shipped,6426.0,1917 Grand Touring Sedan,42,20030324,20030320.0,153.0,,2024-05-06T16:25:08.446Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
6,Mini Gifts Distributors Ltd.,4,20031112,767,83,10182,Shipped,7031.200000000001,1917 Grand Touring Sedan,44,20031121,20031118.0,159.8,,2024-05-06T16:25:08.446Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
2,Signal Gift Stores,5,20030521,215,25,10124,Shipped,3213.0,1917 Grand Touring Sedan,21,20030529,20030525.0,153.0,,2024-05-06T16:25:08.446Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
16,Muscle Machine Inc,6,20031202,981,105,10204,Shipped,5049.0,1917 Grand Touring Sedan,33,20031210,20031204.0,153.0,,2024-05-06T16:25:08.446Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
122,Kelly's Gift Shop,7,20030707,325,39,10138,Shipped,4936.8,1917 Grand Touring Sedan,33,20030716,20030713.0,149.6,,2024-05-06T16:25:08.446Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
119,Signal Collectibles Ltd.,8,20030912,444,50,10149,Shipped,5317.6,1917 Grand Touring Sedan,34,20030918,20030917.0,156.4,,2024-05-06T16:25:08.446Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
69,Corporate Gift Ideas Co.,9,20031018,553,63,10162,Shipped,4091.9,1917 Grand Touring Sedan,29,20031026,20031019.0,141.1,,2024-05-06T16:25:08.446Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
111,"Australian Collectables, Ltd",10,20031121,882,94,10193,Shipped,3213.0,1917 Grand Touring Sedan,21,20031128,20031127.0,153.0,,2024-05-06T16:25:08.446Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json


In [0]:
# writing the detla table to fact_orders_bronze
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f54b4ae7310>

##### Silver Table: Illustrating the Relationship Between "Real-Time" Fact Data and Including Reference Data

In [0]:
# creating the silver temporary view
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
-- displaying the results
SELECT * FROM orders_silver_tempview

customerKey,customerName,factOrdersKey,orderDateKey,orderDetailsKey,orderKey,orderNumber,orderStatus,orderTotalPrice,productName,quantityOrdered,requiredDateKey,shippedDateKey,unitPrice,_rescued_data,receipt_time,source_file
86,Online Diecast Creations Co.,1,20030106,1,1,10100,Shipped,4080.0,1917 Grand Touring Sedan,30,20030113,20030110.0,136.0,,2024-05-06T16:26:15.958Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
57,Rovelli Gifts,2,20031105,661,74,10173,Shipped,4039.2,1917 Grand Touring Sedan,24,20031115,20031109.0,168.3,,2024-05-06T16:26:15.958Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
30,"AV Stores, Co.",3,20030318,104,11,10110,Shipped,6426.0,1917 Grand Touring Sedan,42,20030324,20030320.0,153.0,,2024-05-06T16:26:15.958Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
6,Mini Gifts Distributors Ltd.,4,20031112,767,83,10182,Shipped,7031.200000000001,1917 Grand Touring Sedan,44,20031121,20031118.0,159.8,,2024-05-06T16:26:15.958Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
2,Signal Gift Stores,5,20030521,215,25,10124,Shipped,3213.0,1917 Grand Touring Sedan,21,20030529,20030525.0,153.0,,2024-05-06T16:26:15.958Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
16,Muscle Machine Inc,6,20031202,981,105,10204,Shipped,5049.0,1917 Grand Touring Sedan,33,20031210,20031204.0,153.0,,2024-05-06T16:26:15.958Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
122,Kelly's Gift Shop,7,20030707,325,39,10138,Shipped,4936.8,1917 Grand Touring Sedan,33,20030716,20030713.0,149.6,,2024-05-06T16:26:15.958Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
119,Signal Collectibles Ltd.,8,20030912,444,50,10149,Shipped,5317.6,1917 Grand Touring Sedan,34,20030918,20030917.0,156.4,,2024-05-06T16:26:15.958Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
69,Corporate Gift Ideas Co.,9,20031018,553,63,10162,Shipped,4091.9,1917 Grand Touring Sedan,29,20031026,20031019.0,141.1,,2024-05-06T16:26:15.958Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json
111,"Australian Collectables, Ltd",10,20031121,882,94,10193,Shipped,3213.0,1917 Grand Touring Sedan,21,20031128,20031127.0,153.0,,2024-05-06T16:26:15.958Z,dbfs:/FileStore/final_data/retail/stream/orders/classic_fact_orders_01.json


In [0]:
%sql
-- showing the columns and types for the silver table
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
customerKey,bigint,
customerName,string,
factOrdersKey,bigint,
orderDateKey,bigint,
orderDetailsKey,bigint,
orderKey,bigint,
orderNumber,bigint,
orderStatus,string,
orderTotalPrice,double,
productName,string,


Joining the fact table with the dimension tables at the Silver table phase

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.factOrdersKey,
      o.orderKey,
      o.customerKey,
      c.contactFirstName AS customer_first_name,
      c.contactLastName AS customer_last_name,
      p.productCode,
      p.productName,
      p.buyPrice AS product_buy_price,
      p.quantityInStock AS product_quantity_in_stock,
      o.orderDateKey,
      od.day_name_of_week AS order_day_name_of_week,
      od.day_of_month AS order_day_of_month,
      od.weekday_weekend AS order_weekday_weekend,
      od.month_name AS order_month_name,
      od.calendar_quarter AS order_quarter,
      od.calendar_year AS order_year,
      o.requiredDateKey,
      rd.day_name_of_week AS required_day_name_of_week,
      rd.day_of_month AS required_day_of_month,
      rd.weekday_weekend AS required_weekday_weekend,
      rd.month_name AS required_month_name,
      rd.calendar_quarter AS required_calendar_quarter,
      rd.calendar_year AS required_calendar_year,
      o.shippedDateKey,
      sd.day_name_of_week AS shipped_day_name_of_week,
      sd.day_of_month AS shipped_day_of_month,
      sd.weekday_weekend AS shipped_weekday_weekend,
      sd.month_name AS shipped_month_name,
      sd.calendar_quarter AS shipped_calendar_quarter,
      sd.calendar_year AS shipped_calendar_year,
      o.quantityOrdered,
      o.unitPrice,
      o.orderTotalPrice
  FROM orders_silver_tempview AS o
  INNER JOIN classic_dlh.dim_customer AS c
  ON c.customerKey = o.customerKey
  INNER JOIN classic_dlh.dim_product AS p
  ON p.productName = o.productName
  LEFT OUTER JOIN classic_dlh.dim_date AS od
  ON od.date_key = o.orderDateKey
  LEFT OUTER JOIN classic_dlh.dim_date AS rd
  ON rd.date_key = o.requiredDateKey
  LEFT OUTER JOIN classic_dlh.dim_date AS sd
  ON sd.date_key = o.shippedDateKey
)

In [0]:
# writing the delta table to the fact_orders_silver
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f54b4af55a0>

In [0]:
%sql
-- displaying the results
SELECT * FROM fact_orders_silver

factOrdersKey,orderKey,customerKey,customer_first_name,customer_last_name,productCode,productName,product_buy_price,product_quantity_in_stock,orderDateKey,order_day_name_of_week,order_day_of_month,order_weekday_weekend,order_month_name,order_quarter,order_year,requiredDateKey,required_day_name_of_week,required_day_of_month,required_weekday_weekend,required_month_name,required_calendar_quarter,required_calendar_year,shippedDateKey,shipped_day_name_of_week,shipped_day_of_month,shipped_weekday_weekend,shipped_month_name,shipped_calendar_quarter,shipped_calendar_year,quantityOrdered,unitPrice,orderTotalPrice
1,1,86,Dorothy,Young,S18_1749,1917 Grand Touring Sedan,86.7,2724,20030106,Monday,6,Weekday,January,1,2003,20030113,Monday,13,Weekday,January,1,2003,20030110.0,Friday,10.0,Weekday,January,1.0,2003.0,30,136.0,4080.0
2,74,57,Giovanni,Rovelli,S18_1749,1917 Grand Touring Sedan,86.7,2724,20031105,Wednesday,5,Weekday,November,4,2003,20031115,Saturday,15,Weekend,November,4,2003,20031109.0,Sunday,9.0,Weekend,November,4.0,2003.0,24,168.3,4039.2
3,11,30,Rachel,Ashworth,S18_1749,1917 Grand Touring Sedan,86.7,2724,20030318,Tuesday,18,Weekday,March,1,2003,20030324,Monday,24,Weekday,March,1,2003,20030320.0,Thursday,20.0,Weekday,March,1.0,2003.0,42,153.0,6426.0
4,83,6,Susan,Nelson,S18_1749,1917 Grand Touring Sedan,86.7,2724,20031112,Wednesday,12,Weekday,November,4,2003,20031121,Friday,21,Weekday,November,4,2003,20031118.0,Tuesday,18.0,Weekday,November,4.0,2003.0,44,159.8,7031.200000000001
5,25,2,Jean,King,S18_1749,1917 Grand Touring Sedan,86.7,2724,20030521,Wednesday,21,Weekday,May,2,2003,20030529,Thursday,29,Weekday,May,2,2003,20030525.0,Sunday,25.0,Weekend,May,2.0,2003.0,21,153.0,3213.0
6,105,16,Jeff,Young,S18_1749,1917 Grand Touring Sedan,86.7,2724,20031202,Tuesday,2,Weekday,December,4,2003,20031210,Wednesday,10,Weekday,December,4,2003,20031204.0,Thursday,4.0,Weekday,December,4.0,2003.0,33,153.0,5049.0
7,39,122,Tony,Snowden,S18_1749,1917 Grand Touring Sedan,86.7,2724,20030707,Monday,7,Weekday,July,3,2003,20030716,Wednesday,16,Weekday,July,3,2003,20030713.0,Sunday,13.0,Weekend,July,3.0,2003.0,33,149.6,4936.8
8,50,119,Sue,Taylor,S18_1749,1917 Grand Touring Sedan,86.7,2724,20030912,Friday,12,Weekday,September,3,2003,20030918,Thursday,18,Weekday,September,3,2003,20030917.0,Wednesday,17.0,Weekday,September,3.0,2003.0,34,156.4,5317.6
9,63,69,Julie,Brown,S18_1749,1917 Grand Touring Sedan,86.7,2724,20031018,Saturday,18,Weekend,October,4,2003,20031026,Sunday,26,Weekend,October,4,2003,20031019.0,Sunday,19.0,Weekend,October,4.0,2003.0,29,141.1,4091.9
10,94,111,Sean,Clenahan,S18_1749,1917 Grand Touring Sedan,86.7,2724,20031121,Friday,21,Weekday,November,4,2003,20031128,Friday,28,Weekday,November,4,2003,20031127.0,Thursday,27.0,Weekday,November,4.0,2003.0,21,153.0,3213.0


In [0]:
%sql
-- describing the columns and types for the silver table
DESCRIBE EXTENDED classic_dlh.fact_orders_silver

col_name,data_type,comment
factOrdersKey,bigint,
orderKey,bigint,
customerKey,bigint,
customer_first_name,string,
customer_last_name,string,
productCode,varchar(15),
productName,varchar(70),
product_buy_price,"decimal(10,2)",
product_quantity_in_stock,int,
orderDateKey,bigint,


##### Gold Table: Perform Aggregations
Creatint a new Gold table using the CTAS approach. This table will have an instance for each customer and month, ordered by the number of products they bought in that month. 

This table demonstrates the business value of the solution by summarizing by customer and order details. 

In [0]:
%sql
CREATE OR REPLACE TABLE classic_dlh.fact_orders_by_customer_and_month_gold AS (
  SELECT customerKey AS CustomerID
    , customer_first_name AS FirstName
    , customer_last_name AS LastName
    , order_month_name AS OrderMonth
    , COUNT(productName) AS ProductCount
  FROM classic_dlh.fact_orders_silver
  GROUP BY CustomerID, LastName, FirstName, OrderMonth
  ORDER BY ProductCount DESC);

SELECT * FROM classic_dlh.fact_orders_by_customer_and_month_gold;

CustomerID,FirstName,LastName,OrderMonth,ProductCount
69,Julie,Brown,October,28
20,Veysel,Oeztan,November,25
15,Eric,Natividad,October,18
18,Juri,Hashimoto,October,18
57,Giovanni,Rovelli,February,18
6,Susan,Nelson,November,17
6,Susan,Nelson,July,17
107,Martín,Sommer,May,17
16,Jeff,Young,December,17
80,Laurence,Lebihan,May,17


Further demonstrating business value by querying the fact orders table alongside a couple of the dimension tables. 

In [0]:
%sql
-- displaying the products in order of how much revenue each generated
SELECT p.productName AS Product
    , SUM(o.quantityOrdered) AS `Total Quantity Ordered`
    , SUM(o.orderTotalPrice) AS `Total Revenue on Product`
  FROM classic_dlh.fact_orders_silver AS o
  INNER JOIN classic_dlh.dim_product AS p
  GROUP BY Product
  ORDER BY `Total Revenue on Product` DESC;

Product,Total Quantity Ordered,Total Revenue on Product
1996 Moto Guzzi 1100i,34603,3136746.7100000004
1936 Chrysler Airflow,34603,3136746.7100000004
18th Century Vintage Horse Carriage,34603,3136746.7100000004
The Titanic,34603,3136746.7100000004
1958 Setra Bus,34603,3136746.7100000004
Diamond T620 Semi-Skirted Tanker,34603,3136746.7100000004
2001 Ferrari Enzo,34603,3136746.7100000004
1930 Buick Marquette Phaeton,34603,3136746.7100000004
The Queen Mary,34603,3136746.7100000004
The Mayflower,34603,3136746.7100000004


In [0]:
%sql
-- displaying the customers in order of how much each spent
SELECT c.customerName AS Customer,
        SUM(o.quantityOrdered) AS `Total Items Ordered`,
        SUM(o.orderTotalPrice) AS `Total Spent`
    FROM classic_dlh.fact_orders_silver AS o
    INNER JOIN classic_dlh.dim_customer AS c
    ON o.customerKey = c.customerKey
    GROUP BY Customer
    ORDER BY `Total Spent` DESC;

Customer,Total Items Ordered,Total Spent
Euro+ Shopping Channel,2153,189840.15
Mini Gifts Distributors Ltd.,1898,167783.08000000002
"Dragon Souveniers, Ltd.",1452,150123.15000000002
Muscle Machine Inc,1159,117634.88000000002
Mini Creations Ltd.,1044,93565.24
Rovelli Gifts,1155,89875.59999999999
Technics Stores Inc.,1027,89418.78
Corporate Gift Ideas Co.,976,85559.12
Herkku Gifts,823,85024.45999999999
"Anna's Decorations, Ltd",874,80101.92


#### Clean up the File System

In [0]:
%fs rm -r /FileStore/final_data/