## Project 2 : Capstone 


The "classicmodels" database is a vendor of scale vintage cars. This data includes customers, products, orders, order details, payments, employees, offices, and many more. 

###  Run the libraries and Establish a connection

Running the Libraries 

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType


Establishing Connections 

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ds2002-mysql-hmt3bc.mysql.database.azure.com"  
jdbc_port = 3306
src_database = "classicmodels_dw"

connection_properties = {
  "user" : "hmt3bc",
  "password" : "19@l3X712002",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "car.ekddkan"
atlas_database_name = "classicmodels_dw_mongo"
atlas_user_name = "daphnepfoser"
atlas_password = "q1Yp5OIGlVDnMsjP"

# Data Files (JSON) Information ###############################
dst_database = "classicmodels_dlh"

base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/data" 
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders" # ../data/stream/orders
#orders_stream_dir = f"{stream_dir}/orders"
#purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
#inventory_trans_stream_dir = f"{stream_dir}/inventory_transactions"

orders_output_bronze = f"{database_dir}/fact_orders/bronze" 
orders_output_silver = f"{database_dir}/fact_orders/silver" 
orders_output_gold   = f"{database_dir}/fact_orders/gold" 

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
#dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
#dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
#dbutils.fs.rm(database_dir, True)

True

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Populate Dimensions by Ingesting Reference (Cold-path) Data 


 Get data from Azure MySQL database:


Create a new database "classicmodels_dlh"

In [0]:
%sql
DROP DATABASE IF EXISTS classicmodels_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS classicmodels_dlh
COMMENT "DS-2002 Project2"
LOCATION "dbfs:/FileStore/lab_data/classicmodels_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Project2");

Create a New Table that sources Date Dimension Table from Azure MySQL

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-hmt3bc.mysql.database.azure.com:3306/classicmodels_dw", 
  dbtable "dim_date",
  user "hmt3bc",   
  password "19@l3X712002" 
)

In [0]:
%sql
USE DATABASE classicmodels_dlh;

CREATE OR REPLACE TABLE classicmodels_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/classicmodels_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20030101,2003-01-01,2003/01/01,01/01/2003,01/01/2003,4,Wednesday,1,1,Weekday,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030102,2003-01-02,2003/01/02,01/02/2003,02/01/2003,5,Thursday,2,2,Weekday,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030103,2003-01-03,2003/01/03,01/03/2003,03/01/2003,6,Friday,3,3,Weekday,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030104,2003-01-04,2003/01/04,01/04/2003,04/01/2003,7,Saturday,4,4,Weekend,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030105,2003-01-05,2003/01/05,01/05/2003,05/01/2003,1,Sunday,5,5,Weekend,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3


 Create a New Table that Sources Product Dimension Data from Azure MySQL

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-hmt3bc.mysql.database.azure.com:3306/classicmodels_dw", --Replace with your Server Name
  dbtable "df_products",
  user "hmt3bc",    
  password "19@l3X712002"  
)
-- Create a Temporary View named "view_product" that extracts data from your MySQL Northwind database.

In [0]:
%sql
USE DATABASE classicmodels_dlh;

CREATE OR REPLACE TABLE classicmodels_dlh.dim_product
COMMENT "Products Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/classicmodels_dlh/dim_product"
AS SELECT * FROM view_product

-- Create a new table named "northwind_dlh.dim_product" using data from the view named "view_product"

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_product;

col_name,data_type,comment
product_key,bigint,
product_id,varchar(65535),
product_name,varchar(65535),
product_line,varchar(65535),
quantity_in_stock,bigint,
buy_price,double,
MSRP,double,
,,
# Delta Statistics Columns,,
Column Names,"MSRP, buy_price, product_key, quantity_in_stock, product_name, product_id, product_line",


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_product LIMIT 5

product_key,product_id,product_name,product_line,quantity_in_stock,buy_price,MSRP
1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,7933,48.81,95.7
2,S10_1949,1952 Alpine Renault 1300,Classic Cars,7305,98.58,214.3
3,S10_2016,1996 Moto Guzzi 1100i,Motorcycles,6625,68.99,118.94
4,S10_4698,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,5582,91.02,193.66
5,S10_4757,1972 Alfa Romeo GTA,Classic Cars,3252,85.68,136.0


#### Get Reference Data from a MongoDB Atlas Database


View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/lab_data/data/batch'

path,name,size,modificationTime
dbfs:/FileStore/lab_data/data/batch/df_customers.csv,df_customers.csv,7864,1715123096000
dbfs:/FileStore/lab_data/data/batch/df_productlines.json,df_productlines.json,3780,1715123096000


 Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection


In [0]:
source_dir = '/dbfs/FileStore/lab_data/data/batch'
json_files = {"productlines" : 'df_productlines.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f9c74e61880>

Get "productlines" Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "daphnepfoser"
val pwd = "q1Yp5OIGlVDnMsjP"
val clusterName = "car.ekddkan"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_productlines = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "classicmodels_dw_mongo")
.option("collection", "productlines")
.option("uri", atlas_uri).load()
.select("productline_key","product_line","description")

display(df_productlines)

productline_key,product_line,description
1,Classic Cars,"Attention car enthusiasts: Make your wildest car ownership dreams come true. Whether you are looking for classic muscle cars, dream sports cars or movie-inspired miniatures, you will find great choices in this category. These replicas feature superb attention to detail and craftsmanship and offer features such as working steering system, opening forward compartment, opening rear trunk with removable spare wheel, 4-wheel independent spring suspension, and so on. The models range in size from 1:10 to 1:24 scale and include numerous limited edition and several out-of-production vehicles. All models include a certificate of authenticity from their manufacturers and come fully assembled and ready for display in the home or office."
2,Motorcycles,"Our motorcycles are state of the art replicas of classic as well as contemporary motorcycle legends such as Harley Davidson, Ducati and Vespa. Models contain stunning details such as official logos, rotating wheels, working kickstand, front suspension, gear-shift lever, footbrake lever, and drive chain. Materials used include diecast and plastic. The models range in size from 1:10 to 1:50 scale and include numerous limited edition and several out-of-production vehicles. All models come fully assembled and ready for display in the home or office. Most include a certificate of authenticity."
3,Planes,"Unique, diecast airplane and helicopter replicas suitable for collections, as well as home, office or classroom decorations. Models contain stunning details such as official logos and insignias, rotating jet engines and propellers, retractable wheels, and so on. Most come fully assembled and with a certificate of authenticity from their manufacturers."
4,Ships,"The perfect holiday or anniversary gift for executives, clients, friends, and family. These handcrafted model ships are unique, stunning works of art that will be treasured for generations! They come fully assembled and ready for display in the home or office. We guarantee the highest quality, and best value."
5,Trains,"Model trains are a rewarding hobby for enthusiasts of all ages. Whether you're looking for collectible wooden trains, electric streetcars or locomotives, you'll find a number of great choices for any budget within this category. The interactive aspect of trains makes toy trains perfect for young children. The wooden train sets are ideal for children under the age of 5."
6,Trucks and Buses,"The Truck and Bus models are realistic replicas of buses and specialized trucks produced from the early 1920s to present. The models range in size from 1:12 to 1:50 scale and include numerous limited edition and several out-of-production vehicles. Materials used include tin, diecast and plastic. All models include a certificate of authenticity from their manufacturers and are a perfect ornament for the home and office."
7,Vintage Cars,"Our Vintage Car models realistically portray automobiles produced from the early 1900s through the 1940s. Materials used include Bakelite, diecast, plastic and wood. Most of the replicas are in the 1:18 and 1:24 scale sizes, which provide the optimum in detail and accuracy. Prices range from $30.00 up to $180.00 for some special limited edition replicas. All models include a certificate of authenticity from their manufacturers and come fully assembled and ready for display in the home or office."


In [0]:
%scala
df_productlines.printSchema()

 Use Spark DataFrame to Create a New "productlines" Dimension Table in the Databricks Metadata Database (classicmodels_dlh)

In [0]:
%scala
df_productlines.write.format("delta").mode("overwrite").saveAsTable("classicmodels_dlh.dim_productlines")

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_productlines

col_name,data_type,comment
productline_key,int,
product_line,string,
description,string,
,,
# Delta Statistics Columns,,
Column Names,"productline_key, product_line, description",
Column Selection Method,first-32,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_productlines LIMIT 5

productline_key,product_line,description
1,Classic Cars,"Attention car enthusiasts: Make your wildest car ownership dreams come true. Whether you are looking for classic muscle cars, dream sports cars or movie-inspired miniatures, you will find great choices in this category. These replicas feature superb attention to detail and craftsmanship and offer features such as working steering system, opening forward compartment, opening rear trunk with removable spare wheel, 4-wheel independent spring suspension, and so on. The models range in size from 1:10 to 1:24 scale and include numerous limited edition and several out-of-production vehicles. All models include a certificate of authenticity from their manufacturers and come fully assembled and ready for display in the home or office."
2,Motorcycles,"Our motorcycles are state of the art replicas of classic as well as contemporary motorcycle legends such as Harley Davidson, Ducati and Vespa. Models contain stunning details such as official logos, rotating wheels, working kickstand, front suspension, gear-shift lever, footbrake lever, and drive chain. Materials used include diecast and plastic. The models range in size from 1:10 to 1:50 scale and include numerous limited edition and several out-of-production vehicles. All models come fully assembled and ready for display in the home or office. Most include a certificate of authenticity."
3,Planes,"Unique, diecast airplane and helicopter replicas suitable for collections, as well as home, office or classroom decorations. Models contain stunning details such as official logos and insignias, rotating jet engines and propellers, retractable wheels, and so on. Most come fully assembled and with a certificate of authenticity from their manufacturers."
4,Ships,"The perfect holiday or anniversary gift for executives, clients, friends, and family. These handcrafted model ships are unique, stunning works of art that will be treasured for generations! They come fully assembled and ready for display in the home or office. We guarantee the highest quality, and best value."
5,Trains,"Model trains are a rewarding hobby for enthusiasts of all ages. Whether you're looking for collectible wooden trains, electric streetcars or locomotives, you'll find a number of great choices for any budget within this category. The interactive aspect of trains makes toy trains perfect for young children. The wooden train sets are ideal for children under the age of 5."


 Get Data from a File System


Use PySpark to Read from a CSV File

#### Customers

In [0]:
customers_csv = f"{batch_dir}/df_customers.csv"

df_customers = spark.read.format('csv').options(delimiter=";", header='true', inferSchema='true').load(customers_csv)
display(df_customers)

customer_key,customer_id,customer_name,city,state,postal_code,country
1,103,Atelier graphique,Nantes,,44000,France
2,112,Signal Gift Stores,Las Vegas,NV,83030,USA
3,114,"Australian Collectors, Co.",Melbourne,Victoria,3004,Australia
4,119,La Rochelle Gifts,Nantes,,44000,France
5,121,Baane Mini Imports,Stavern,,4110,Norway
6,124,Mini Gifts Distributors Ltd.,San Rafael,CA,97562,USA
7,125,Havel & Zbyszek Co,Warszawa,,01-012,Poland
8,128,"Blauer See Auto, Co.",Frankfurt,,60528,Germany
9,129,Mini Wheels Co.,San Francisco,CA,94217,USA
10,131,Land of Toys Inc.,NYC,NY,10022,USA


In [0]:
df_customers.printSchema()

root
 |-- customer_key: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- country: string (nullable = true)



In [0]:
df_customers.write.format("delta").mode("overwrite").saveAsTable("classicmodels_dlh.dim_customers")

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_customers;

col_name,data_type,comment
customer_key,int,
customer_id,int,
customer_name,string,
city,string,
state,string,
postal_code,string,
country,string,
,,
# Delta Statistics Columns,,
Column Names,"city, customer_id, customer_name, state, country, postal_code, customer_key",


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_customers LIMIT 5;

customer_key,customer_id,customer_name,city,state,postal_code,country
1,103,Atelier graphique,Nantes,,44000,France
2,112,Signal Gift Stores,Las Vegas,NV,83030,USA
3,114,"Australian Collectors, Co.",Melbourne,Victoria,3004,Australia
4,119,La Rochelle Gifts,Nantes,,44000,France
5,121,Baane Mini Imports,Stavern,,4110,Norway


#### Employees

In [0]:
employees_csv = f"{batch_dir}/df_employees.csv"

df_employees = spark.read.format('csv').options(delimiter=";", header='true', inferSchema='true').load(employees_csv)
display(df_employees)

employeeNumber,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,0,President
1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002,VP Sales
1076,Firrelli,Jeff,x9273,jfirrelli@classicmodelcars.com,1,1002,VP Marketing
1088,Patterson,William,x4871,wpatterson@classicmodelcars.com,6,1056,Sales Manager (APAC)
1102,Bondur,Gerard,x5408,gbondur@classicmodelcars.com,4,1056,Sale Manager (EMEA)
1143,Bow,Anthony,x5428,abow@classicmodelcars.com,1,1056,Sales Manager (NA)
1165,Jennings,Leslie,x3291,ljennings@classicmodelcars.com,1,1143,Sales Rep
1166,Thompson,Leslie,x4065,lthompson@classicmodelcars.com,1,1143,Sales Rep
1188,Firrelli,Julie,x2173,jfirrelli@classicmodelcars.com,2,1143,Sales Rep
1216,Patterson,Steve,x4334,spatterson@classicmodelcars.com,2,1143,Sales Rep


In [0]:
df_employees.write.format("delta").mode("overwrite").saveAsTable("classicmodels_dlh.dim_employees")

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_employees;

col_name,data_type,comment
employeeNumber,int,
lastName,string,
firstName,string,
extension,string,
email,string,
officeCode,int,
reportsTo,int,
jobTitle,string,
,,
# Delta Statistics Columns,,


In [0]:
%sql SELECT * FROM classicmodels_dlh.dim_employees LIMIT 5; 

employeeNumber,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,0,President
1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002,VP Sales
1076,Firrelli,Jeff,x9273,jfirrelli@classicmodelcars.com,1,1002,VP Marketing
1088,Patterson,William,x4871,wpatterson@classicmodelcars.com,6,1056,Sales Manager (APAC)
1102,Bondur,Gerard,x5408,gbondur@classicmodelcars.com,4,1056,Sale Manager (EMEA)


##### Verify Dimension Tables

In [0]:
%sql
USE classicmodels_dlh;
SHOW TABLES

database,tableName,isTemporary
classicmodels_dlh,dim_customers,False
classicmodels_dlh,dim_date,False
classicmodels_dlh,dim_employees,False
classicmodels_dlh,dim_product,False
classicmodels_dlh,dim_productlines,False
classicmodels_dlh,fact_orders_bronze,False
classicmodels_dlh,fact_orders_silver,False
,display_query_10,True
,display_query_11,True
,display_query_7,True


###  Integrate Reference Data with Real-Time Data


In [0]:
# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_orders_bronze", True) 
dbutils.fs.rm(f"{database_dir}/fact_orders_silver", True) 

True

Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data: 

Bronze Table: Process 'Raw' JSON Data

In [0]:

(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 #.option("cloudFiles.schemaHints", "order_key BIGINT")
 #.option("cloudFiles.schemaHints", "employee_key BIGINT")
 #.option("cloudFiles.schemaHints", "customer_key BIGINT") 
 #.option("cloudFiles.schemaHints", "product_key BIGINT")
 #.option("cloudFiles.schemaHints", "shipper_key DECIMAL")
 #.option("cloudFiles.schemaHints", "order_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "paid_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "shipped_date_key DECIMAL") 
 #.option("cloudFiles.schemaHints", "quantity DECIMAL")
 #.option("cloudFiles.schemaHints", "unit_price DECIMAL")
 #.option("cloudFiles.schemaHints", "discount DECIMAL")
 #.option("cloudFiles.schemaHints", "shipping_fee DECIMAL")
 #.option("cloudFiles.schemaHints", "taxes DECIMAL")
 #.option("cloudFiles.schemaHints", "tax_rate DECIMAL")
 #.option("cloudFiles.schemaHints", "payment_type STRING")
 #.option("cloudFiles.schemaHints", "order_status STRING")
 #.option("cloudFiles.schemaHints", "order_details_status STRING")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

customer_id,employee_id,fact_order_key,my_row_id,order_date_key,order_id,price_each,product_id,quantity_ordered,_rescued_data,receipt_time,source_file
146,1088,1028,1,20040102,10208,176.63,S12_1108,46,,2024-05-08T23:34:23.318Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04v2.json
146,1002,1029,2,20040102,10208,128.42,S12_3148,26,,2024-05-08T23:34:23.318Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04v2.json
146,1102,1030,3,20040102,10208,152.26,S12_3891,20,,2024-05-08T23:34:23.318Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04v2.json
146,1337,1031,4,20040102,10208,117.47,S18_3140,24,,2024-05-08T23:34:23.318Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04v2.json
146,1056,1032,5,20040102,10208,96.81,S18_3259,48,,2024-05-08T23:34:23.318Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04v2.json
146,1286,1033,6,20040102,10208,72.85,S18_4522,45,,2024-05-08T23:34:23.318Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04v2.json
146,1401,1034,7,20040102,10208,122.89,S24_2011,35,,2024-05-08T23:34:23.318Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04v2.json
146,1501,1035,8,20040102,10208,80.54,S24_3151,20,,2024-05-08T23:34:23.318Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04v2.json
146,1401,1036,9,20040102,10208,57.99,S50_1514,30,,2024-05-08T23:34:23.318Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04v2.json
146,1143,1037,10,20040102,10208,56.67,S700_1138,38,,2024-05-08T23:34:23.318Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04v2.json


In [0]:
# creates the stream, keep running for next steps...
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f9c66bef090>

Silver Table: Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

customer_id,fact_order_key,my_row_id,order_date_key,order_id,price_each,product_id,quantity_ordered,_rescued_data,receipt_time,source_file
146,1028,1,20040102,10208,176.63,S12_1108,46,,2024-05-08T22:20:37.926Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04.json
146,1029,2,20040102,10208,128.42,S12_3148,26,,2024-05-08T22:20:37.926Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04.json
146,1030,3,20040102,10208,152.26,S12_3891,20,,2024-05-08T22:20:37.926Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04.json
146,1031,4,20040102,10208,117.47,S18_3140,24,,2024-05-08T22:20:37.926Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04.json
146,1032,5,20040102,10208,96.81,S18_3259,48,,2024-05-08T22:20:37.926Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04.json
146,1033,6,20040102,10208,72.85,S18_4522,45,,2024-05-08T22:20:37.926Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04.json
146,1034,7,20040102,10208,122.89,S24_2011,35,,2024-05-08T22:20:37.926Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04.json
146,1035,8,20040102,10208,80.54,S24_3151,20,,2024-05-08T22:20:37.926Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04.json
146,1036,9,20040102,10208,57.99,S50_1514,30,,2024-05-08T22:20:37.926Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04.json
146,1037,10,20040102,10208,56.67,S700_1138,38,,2024-05-08T22:20:37.926Z,dbfs:/FileStore/lab_data/data/stream/orders/df_fact_orders_04.json


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
customer_id,bigint,
employee_id,bigint,
fact_order_key,bigint,
my_row_id,bigint,
order_date_key,bigint,
order_id,bigint,
price_each,double,
product_id,string,
quantity_ordered,bigint,
_rescued_data,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.fact_order_key,
      o.order_id,
      o.customer_id,
      c.customer_name AS customer_name,
      o.employee_id,
      e.lastName AS emp_last_name,
      e.firstName AS emp_first_name,
      o.product_id,
      p.product_name,
      p.product_line as productline_key,
      p.buy_price AS product_cost,
      p.MSRP AS product_list_price,
      o.order_date_key,
      od.day_name_of_week AS order_day_name_of_week,
      od.day_of_month AS order_day_of_month,
      od.weekday_weekend AS order_weekday_weekend,
      od.month_name AS order_month_name,
      od.calendar_quarter AS order_quarter,
      od.calendar_year AS order_year,
      o.quantity_ordered AS order_quantity,
      o.price_each as unit_price
  FROM orders_silver_tempview AS o
  INNER JOIN classicmodels_dlh.dim_customers AS c
  ON c.customer_id = o.customer_id
  INNER JOIN classicmodels_dlh.dim_employees AS e
  ON e.employeeNumber = o.employee_id
  INNER JOIN classicmodels_dlh.dim_product AS p
  ON p.product_id = o.product_id
  LEFT OUTER JOIN classicmodels_dlh.dim_date AS od
  ON od.date_key = o.order_date_key
  
  
)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f9c66bed850>

In [0]:
%sql
SELECT * FROM fact_orders_silver

fact_order_key,order_id,customer_id,customer_name,employee_id,emp_last_name,emp_first_name,product_id,product_name,productline_key,product_cost,product_list_price,order_date_key,order_day_name_of_week,order_day_of_month,order_weekday_weekend,order_month_name,order_quarter,order_year,order_quantity,unit_price
1028,10208,146,"Saveley & Henriot, Co.",1088,Patterson,William,S12_1108,2001 Ferrari Enzo,Classic Cars,95.59,207.8,20040102,Friday,2,Weekday,January,1,2004,46,176.63
1029,10208,146,"Saveley & Henriot, Co.",1002,Murphy,Diane,S12_3148,1969 Corvair Monza,Classic Cars,89.14,151.08,20040102,Friday,2,Weekday,January,1,2004,26,128.42
1030,10208,146,"Saveley & Henriot, Co.",1102,Bondur,Gerard,S12_3891,1969 Ford Falcon,Classic Cars,83.05,173.02,20040102,Friday,2,Weekday,January,1,2004,20,152.26
1031,10208,146,"Saveley & Henriot, Co.",1337,Bondur,Loui,S18_3140,1903 Ford Model A,Vintage Cars,68.3,136.59,20040102,Friday,2,Weekday,January,1,2004,24,117.47
1032,10208,146,"Saveley & Henriot, Co.",1056,Patterson,Mary,S18_3259,Collectable Wooden Train,Trains,67.56,100.84,20040102,Friday,2,Weekday,January,1,2004,48,96.81
1033,10208,146,"Saveley & Henriot, Co.",1286,Tseng,Foon Yue,S18_4522,1904 Buick Runabout,Vintage Cars,52.66,87.77,20040102,Friday,2,Weekday,January,1,2004,45,72.85
1034,10208,146,"Saveley & Henriot, Co.",1401,Castillo,Pamela,S24_2011,18th century schooner,Ships,82.34,122.89,20040102,Friday,2,Weekday,January,1,2004,35,122.89
1035,10208,146,"Saveley & Henriot, Co.",1501,Bott,Larry,S24_3151,1912 Ford Model T Delivery Wagon,Vintage Cars,46.91,88.51,20040102,Friday,2,Weekday,January,1,2004,20,80.54
1036,10208,146,"Saveley & Henriot, Co.",1401,Castillo,Pamela,S50_1514,1962 City of Detroit Streetcar,Trains,37.49,58.58,20040102,Friday,2,Weekday,January,1,2004,30,57.99
1037,10208,146,"Saveley & Henriot, Co.",1143,Bow,Anthony,S700_1138,The Schooner Bluenose,Ships,34.0,66.67,20040102,Friday,2,Weekday,January,1,2004,38,56.67


Databricks data profile. Run in Databricks to view.

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.fact_orders_silver

col_name,data_type,comment
fact_order_key,bigint,
order_id,bigint,
customer_id,bigint,
customer_name,string,
employee_id,bigint,
emp_last_name,string,
emp_first_name,string,
product_id,string,
product_name,varchar(65535),
productline_key,varchar(65535),


Gold Table: Perform Aggregations

Create a new Gold table using the CTAS approach. The table should include the number of products sold per customer each Month, along with the Customers' ID, First & Last Name, and the Month in which the order was placed.

In [0]:
# Summarize the count of products ordered by each customer in each month
%sql
CREATE OR REPLACE TABLE classicmodels_dlh.fact_monthly_orders_by_customer_gold AS (
  SELECT customer_id AS CustomerID
    , customer_name AS Name
    , order_month_name AS OrderMonth
    , COUNT(product_id) AS ProductCount
  FROM classicmodels_dlh.fact_orders_silver
  GROUP BY CustomerID, Name, OrderMonth
  ORDER BY CustomerID DESC);

SELECT * FROM classicmodels_dlh.fact_monthly_orders_by_customer_gold;

CustomerID,Name,OrderMonth,ProductCount
496,Kelly's Gift Shop,December,18
496,Kelly's Gift Shop,November,9
496,Kelly's Gift Shop,July,13
496,Kelly's Gift Shop,April,8
495,Diecast Collectables,April,2
495,Diecast Collectables,December,16
489,"Double Decker Gift Stores, Ltd",January,3
489,"Double Decker Gift Stores, Ltd",November,9
487,Signal Collectibles Ltd.,February,4
487,Signal Collectibles Ltd.,September,11


In [0]:
# Summarizes the total revenue across each customer's orders 
%sql
CREATE OR REPLACE TABLE classicmodels_dlh.fact_monthly_orders_by_customer_gold AS (
  SELECT customer_id AS CustomerID
    , customer_name AS Name
    , ROUND(SUM(unit_price*order_quantity)) AS Revenue
  FROM classicmodels_dlh.fact_orders_silver
  GROUP BY CustomerID, Name
  ORDER BY Revenue DESC);

SELECT * FROM classicmodels_dlh.fact_monthly_orders_by_customer_gold;

CustomerID,Name,Revenue
141,Euro+ Shopping Channel,820690.0
124,Mini Gifts Distributors Ltd.,591827.0
114,"Australian Collectors, Co.",180585.0
151,Muscle Machine Inc,177914.0
119,La Rochelle Gifts,158573.0
148,"Dragon Souveniers, Ltd.",156251.0
323,"Down Under Souveniers, Inc",154622.0
131,Land of Toys Inc.,149085.0
187,"AV Stores, Co.",148410.0
450,The Sharp Gifts Warehouse,143536.0


In [0]:
# Put monthly orders by customers, including the customer's id and name, with also the order month and revenue
%sql
CREATE OR REPLACE TABLE classicmodels_dlh.fact_monthly_orders_by_customer_gold AS (
  SELECT customer_id AS CustomerID
    , customer_name AS Name
    , order_month_name AS OrderMonth
    , ROUND(SUM(unit_price*order_quantity)) AS Revenue
  FROM classicmodels_dlh.fact_orders_silver
  GROUP BY CustomerID, Name, OrderMonth
  ORDER BY CustomerID DESC, OrderMonth ASC);

SELECT * FROM classicmodels_dlh.fact_monthly_orders_by_customer_gold;

CustomerID,Name,OrderMonth,Revenue
496,Kelly's Gift Shop,April,30254.0
496,Kelly's Gift Shop,December,52166.0
496,Kelly's Gift Shop,July,32077.0
496,Kelly's Gift Shop,November,22964.0
495,Diecast Collectables,April,6277.0
495,Diecast Collectables,December,59265.0
489,"Double Decker Gift Stores, Ltd",January,7310.0
489,"Double Decker Gift Stores, Ltd",November,22276.0
487,Signal Collectibles Ltd.,February,12573.0
487,Signal Collectibles Ltd.,September,29997.0
