## DS 2002 Final Project
Caroline Sullivan

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "wmr6ku-mysql.database.azure.com"
jdbc_port = 3306
src_database = "classicmodels_dw"

connection_properties = {
  "user" : "wmr6ku",
  "password" : "Oakley2017good123!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "Cluster0.5nhij"
atlas_database_name = "classicmodels_dw"
atlas_user_name = "wmr6ku"
atlas_password = "Oakley2017good123!"

# Data Files (JSON) Information ###############################
dst_database = "classicmodels_dlh"

base_dir = "dbfs:/FileStore/ds2002_final_project_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/classicmodels"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

False

#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS classicmodels_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS classicmodels_dlh
COMMENT "DS 2002 Final Project Database"
LOCATION "dbfs:/FileStore/ds2002_final_project_data/classicmodels_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS 2002 Final Project");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://wmr6ku-mysql.mysql.database.azure.com:3306/classicmodels_dw", 
  dbtable "dim_date",
  user "wmr6ku",    
  password "Oakley2017good123!"  
)

In [0]:
%sql
USE DATABASE classicmodels_dlh;

CREATE OR REPLACE TABLE classicmodels_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002_final_project_data/classicmodels_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_product" that extracts data from your MySQL database.
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://wmr6ku-mysql.mysql.database.azure.com:3306/classicmodels_dw", 
  dbtable "dim_products",
  user "wmr6ku", 
  password "Oakley2017good123!" 
)

In [0]:
%sql
USE DATABASE classicmodels_dlh;
CREATE OR REPLACE TABLE classicmodels_dlh.dim_product
COMMENT "Products Dimension Table"
LOCATION "dbfs:/FileStore/ds2002_final_project_data/classicmodels_dlh/dim_product"
AS SELECT * FROM view_product
-- Create a new table named "classicmodels_dlh.dim_product" using data from the view named "view_product"

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_product;

col_name,data_type,comment
product_key,bigint,
productCode,varchar(65535),
productName,varchar(65535),
productLine,varchar(65535),
productScale,varchar(65535),
productVendor,varchar(65535),
productDescription,varchar(65535),
quantityInStock,bigint,
buyPrice,double,
MSRP,double,


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_product LIMIT 5

product_key,productCode,productName,productLine,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,"This replica features working kickstand, front suspension, gear-shift lever, footbrake lever, drive chain, wheels and steering. All parts are particularly delicate due to their precise scale and require special care and attention.",7933,48.81,95.7
2,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,7305,98.58,214.3
3,S10_2016,1996 Moto Guzzi 1100i,Motorcycles,1:10,Highway 66 Mini Classics,"Official Moto Guzzi logos and insignias, saddle bags located on side of motorcycle, detailed engine, working steering, working suspension, two leather seats, luggage rack, dual exhaust pipes, small saddle bag located on handle bars, two-tone paint with chrome accents, superior die-cast detail , rotating wheels , working kick stand, diecast metal with plastic parts and baked enamel finish.",6625,68.99,118.94
4,S10_4698,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,1:10,Red Start Diecast,"Model features, official Harley Davidson logos and insignias, detachable rear wheelie bar, heavy diecast metal with resin parts, authentic multi-color tampo-printed graphics, separate engine drive belts, free-turning front fork, rotating tires and rear racing slick, certificate of authenticity, detailed engine, display stand , precision diecast replica, baked enamel finish, 1:10 scale model, removable fender, seat and tank cover piece for displaying the superior detail of the v-twin engine",5582,91.02,193.66
5,S10_4757,1972 Alfa Romeo GTA,Classic Cars,1:10,Motor City Art Classics,Features include: Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,3252,85.68,136.0


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/ds2002_final_project_data/classicmodels/batch'

path,name,size,modificationTime
dbfs:/FileStore/ds2002_final_project_data/classicmodels/batch/classicmodels_dimcustomers.json,classicmodels_dimcustomers.json,50028,1733682704000
dbfs:/FileStore/ds2002_final_project_data/classicmodels/batch/classicmodels_dimemployees.csv,classicmodels_dimemployees.csv,1860,1733682704000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/ds2002_final_project_data/classicmodels/batch'
json_files = {"customers" : 'classicmodels_dimcustomers.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f5d4e723c80>

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%python
%pip install pymongo[srv]

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
%scala
import com.mongodb.spark._

val userName = "wmr6ku" 
val pwd = "Oakley2017good123!"
val clusterName = "Cluster0.5nhij"
val atlas_uri = s"mongodb+srv://wmr6ku:Oakley2017good123!@cluster0.5nhij.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

In [0]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "classicmodels_dw")
.option("collection", "customers").load()
.select("customer_key","customerNumber", "customerName", "contactLastName", "contactFirstName", "phone", "addressLine1", "city", "state", "postalCode", "country")

display(df_customer)

customer_key,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,city,state,postalCode,country
1,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",Nantes,,44000,France
2,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,Las Vegas,NV,83030,USA
3,114,"Australian Collectors, Co.",Ferguson,Peter,03 9520 4555,636 St Kilda Road,Melbourne,Victoria,3004,Australia
4,119,La Rochelle Gifts,Labrune,Janine,40.67.8555,"67, rue des Cinquante Otages",Nantes,,44000,France
5,121,Baane Mini Imports,Bergulfsen,Jonas,07-98 9555,Erling Skakkes gate 78,Stavern,,4110,Norway
6,124,Mini Gifts Distributors Ltd.,Nelson,Susan,4155551450,5677 Strong St.,San Rafael,CA,97562,USA
7,125,Havel & Zbyszek Co,Piestrzeniewicz,Zbyszek,(26) 642-7555,ul. Filtrowa 68,Warszawa,,01-012,Poland
8,128,"Blauer See Auto, Co.",Keitel,Roland,+49 69 66 90 2555,Lyonerstr. 34,Frankfurt,,60528,Germany
9,129,Mini Wheels Co.,Murphy,Julie,6505555787,5557 North Pendale Street,San Francisco,CA,94217,USA
10,131,Land of Toys Inc.,Lee,Kwai,2125557818,897 Long Airport Avenue,NYC,NY,10022,USA


In [0]:
%scala
df_customer.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database 

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("classicmodels_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_customer

col_name,data_type,comment
customer_key,int,
customerNumber,int,
customerName,string,
contactLastName,string,
contactFirstName,string,
phone,string,
addressLine1,string,
city,string,
state,string,
postalCode,string,


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_customer LIMIT 5

customer_key,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,city,state,postalCode,country
1,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",Nantes,,44000,France
2,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,Las Vegas,NV,83030,USA
3,114,"Australian Collectors, Co.",Ferguson,Peter,03 9520 4555,636 St Kilda Road,Melbourne,Victoria,3004,Australia
4,119,La Rochelle Gifts,Labrune,Janine,40.67.8555,"67, rue des Cinquante Otages",Nantes,,44000,France
5,121,Baane Mini Imports,Bergulfsen,Jonas,07-98 9555,Erling Skakkes gate 78,Stavern,,4110,Norway


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
employee_csv = f"{batch_dir}/classicmodels_dimemployees.csv"

df_employee = spark.read.format('csv').options(header='true', inferSchema='true').load(employee_csv)
display(df_employee)

employee_key,employeeNumber,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
1,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,President
2,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales
3,1076,Firrelli,Jeff,x9273,jfirrelli@classicmodelcars.com,1,1002.0,VP Marketing
4,1088,Patterson,William,x4871,wpatterson@classicmodelcars.com,6,1056.0,Sales Manager (APAC)
5,1102,Bondur,Gerard,x5408,gbondur@classicmodelcars.com,4,1056.0,Sale Manager (EMEA)
6,1143,Bow,Anthony,x5428,abow@classicmodelcars.com,1,1056.0,Sales Manager (NA)
7,1165,Jennings,Leslie,x3291,ljennings@classicmodelcars.com,1,1143.0,Sales Rep
8,1166,Thompson,Leslie,x4065,lthompson@classicmodelcars.com,1,1143.0,Sales Rep
9,1188,Firrelli,Julie,x2173,jfirrelli@classicmodelcars.com,2,1143.0,Sales Rep
10,1216,Patterson,Steve,x4334,spatterson@classicmodelcars.com,2,1143.0,Sales Rep


In [0]:
df_employee.printSchema()

root
 |-- employee_key: integer (nullable = true)
 |-- employeeNumber: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- extension: string (nullable = true)
 |-- email: string (nullable = true)
 |-- officeCode: integer (nullable = true)
 |-- reportsTo: string (nullable = true)
 |-- jobTitle: string (nullable = true)



In [0]:
df_employee.write.format("delta").mode("overwrite").saveAsTable("classicmodels_dlh.dim_employee")

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_employee;

col_name,data_type,comment
employee_key,int,
employeeNumber,int,
lastName,string,
firstName,string,
extension,string,
email,string,
officeCode,int,
reportsTo,string,
jobTitle,string,
,,


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_employee LIMIT 5;

employee_key,employeeNumber,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
1,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,President
2,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales
3,1076,Firrelli,Jeff,x9273,jfirrelli@classicmodelcars.com,1,1002.0,VP Marketing
4,1088,Patterson,William,x4871,wpatterson@classicmodelcars.com,6,1056.0,Sales Manager (APAC)
5,1102,Bondur,Gerard,x5408,gbondur@classicmodelcars.com,4,1056.0,Sale Manager (EMEA)


##### 3.2 Verify Dimension Tables

In [0]:
%sql
USE classicmodels_dlh;
SHOW TABLES

database,tableName,isTemporary
classicmodels_dlh,dim_customer,False
classicmodels_dlh,dim_date,False
classicmodels_dlh,dim_employee,False
classicmodels_dlh,dim_product,False
,_sqldf,True
,view_date,True
,view_product,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

customer_key,fact_order_key,orderNumber,order_date_key,priceEach,product_key,quantityOrdered,shipped_date_key,_rescued_data,receipt_time,source_file
66,1999,10221,20040218,133.86,39,33,20040219.0,,2024-12-08T18:38:18.604Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders3.json
66,2000,10273,20040721,117.47,39,40,20040722.0,,2024-12-08T18:38:18.604Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders3.json
47,2001,10232,20040320,133.86,39,22,20040325.0,,2024-12-08T18:38:18.604Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders3.json
94,2002,10372,20050126,131.13,39,28,20050128.0,,2024-12-08T18:38:18.604Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders3.json
85,2003,10414,20050506,128.39,39,41,,,2024-12-08T18:38:18.604Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders3.json
50,2004,10293,20040909,110.64,39,24,20040914.0,,2024-12-08T18:38:18.604Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders3.json
5,2005,10325,20041105,44.37,88,38,20041108.0,,2024-12-08T18:38:18.604Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders3.json
11,2006,10104,20030131,51.95,88,35,20030201.0,,2024-12-08T18:38:18.604Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders3.json
11,2007,10246,20040505,45.45,88,35,20040506.0,,2024-12-08T18:38:18.604Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders3.json
11,2008,10383,20050222,53.57,88,32,20050225.0,,2024-12-08T18:38:18.604Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders3.json


In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f5d10cd1f90>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

customer_key,fact_order_key,orderNumber,order_date_key,priceEach,product_key,quantityOrdered,shipped_date_key,_rescued_data,receipt_time,source_file
86,1,10100,20030106,136.0,23,30,20030110.0,,2024-12-08T18:38:52.275Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders1.json
11,2,10379,20050210,156.4,23,39,20050211.0,,2024-12-08T18:38:52.275Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders1.json
57,3,10173,20031105,168.3,23,24,20031109.0,,2024-12-08T18:38:52.275Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders1.json
118,4,10331,20041117,154.7,23,44,20041123.0,,2024-12-08T18:38:52.275Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders1.json
30,5,10110,20030318,153.0,23,42,20030320.0,,2024-12-08T18:38:52.275Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders1.json
6,6,10182,20031112,159.8,23,44,20031118.0,,2024-12-08T18:38:52.275Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders1.json
6,7,10312,20041021,146.2,23,48,20041023.0,,2024-12-08T18:38:52.275Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders1.json
80,8,10344,20041125,168.3,23,45,20041129.0,,2024-12-08T18:38:52.275Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders1.json
2,9,10124,20030521,153.0,23,21,20030525.0,,2024-12-08T18:38:52.275Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders1.json
107,10,10214,20040126,166.6,23,30,20040129.0,,2024-12-08T18:38:52.275Z,dbfs:/FileStore/ds2002_final_project_data/classicmodels/stream/classicmodels_factorders1.json


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
customer_key,bigint,
fact_order_key,bigint,
orderNumber,bigint,
order_date_key,bigint,
priceEach,double,
product_key,bigint,
quantityOrdered,bigint,
shipped_date_key,bigint,
_rescued_data,string,
receipt_time,timestamp,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.fact_order_key,
      o.orderNumber,
      o.product_key,
      p.productCode,
      p.productName, 
      p.productLine,
      p.buyPrice,
      o.customer_key,
      c.customerNumber,
      c.customerName,
      c.contactLastName,
      c.contactFirstName,
      o.shipped_date_key,
      sd.day_name_of_week AS shipped_day_name_of_week,
      sd.day_of_month AS shipped_day_of_month,
      sd.weekday_weekend AS shipped_weekday_weekend,
      sd.month_name AS shipped_month_name,
      sd.calendar_quarter AS shipped_quarter,
      sd.calendar_year AS shipped_year,
      o.order_date_key,
      od.day_name_of_week AS order_day_name_of_week,
      od.day_of_month AS order_day_of_month,
      od.weekday_weekend AS order_weekday_weekend,
      od.month_name AS order_month_name,
      od.calendar_quarter AS order_quarter,
      od.calendar_year AS order_year,
      o.priceEach,
      o.quantityOrdered
  FROM orders_silver_tempview AS o
  INNER JOIN classicmodels_dlh.dim_product AS p
  ON p.product_key = o.product_key
  INNER JOIN classicmodels_dlh.dim_customer AS c
  ON c.customer_key = o.customer_key
  INNER JOIN classicmodels_dlh.dim_date AS sd
  ON sd.date_key = o.shipped_date_key
  INNER JOIN classicmodels_dlh.dim_date AS od
  ON od.date_key = o.order_date_key
)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f5d24b27150>

In [0]:
%sql
SELECT * FROM fact_orders_silver

fact_order_key,orderNumber,product_key,productCode,productName,productLine,buyPrice,customer_key,customerNumber,customerName,contactLastName,contactFirstName,shipped_date_key,shipped_day_name_of_week,shipped_day_of_month,shipped_weekday_weekend,shipped_month_name,shipped_quarter,shipped_year,order_date_key,order_day_name_of_week,order_day_of_month,order_weekday_weekend,order_month_name,order_quarter,order_year,priceEach,quantityOrdered
1,10100,23,S18_1749,1917 Grand Touring Sedan,Vintage Cars,86.7,86,363,Online Diecast Creations Co.,Young,Dorothy,20030110,Friday,10,Weekday,January,1,2003,20030106,Monday,6,Weekday,January,1,2003,136.0,30
2,10379,23,S18_1749,1917 Grand Touring Sedan,Vintage Cars,86.7,11,141,Euro+ Shopping Channel,Freyre,Diego,20050211,Friday,11,Weekday,February,1,2005,20050210,Thursday,10,Weekday,February,1,2005,156.4,39
3,10173,23,S18_1749,1917 Grand Touring Sedan,Vintage Cars,86.7,57,278,Rovelli Gifts,Rovelli,Giovanni,20031109,Sunday,9,Weekend,November,4,2003,20031105,Wednesday,5,Weekday,November,4,2003,168.3,24
4,10331,23,S18_1749,1917 Grand Touring Sedan,Vintage Cars,86.7,118,486,Motor Mint Distributors Inc.,Salazar,Rosa,20041123,Tuesday,23,Weekday,November,4,2004,20041117,Wednesday,17,Weekday,November,4,2004,154.7,44
5,10110,23,S18_1749,1917 Grand Touring Sedan,Vintage Cars,86.7,30,187,"AV Stores, Co.",Ashworth,Rachel,20030320,Thursday,20,Weekday,March,1,2003,20030318,Tuesday,18,Weekday,March,1,2003,153.0,42
6,10182,23,S18_1749,1917 Grand Touring Sedan,Vintage Cars,86.7,6,124,Mini Gifts Distributors Ltd.,Nelson,Susan,20031118,Tuesday,18,Weekday,November,4,2003,20031112,Wednesday,12,Weekday,November,4,2003,159.8,44
7,10312,23,S18_1749,1917 Grand Touring Sedan,Vintage Cars,86.7,6,124,Mini Gifts Distributors Ltd.,Nelson,Susan,20041023,Saturday,23,Weekend,October,4,2004,20041021,Thursday,21,Weekday,October,4,2004,146.2,48
8,10344,23,S18_1749,1917 Grand Touring Sedan,Vintage Cars,86.7,80,350,Marseille Mini Autos,Lebihan,Laurence,20041129,Monday,29,Weekday,November,4,2004,20041125,Thursday,25,Weekday,November,4,2004,168.3,45
9,10124,23,S18_1749,1917 Grand Touring Sedan,Vintage Cars,86.7,2,112,Signal Gift Stores,King,Jean,20030525,Sunday,25,Weekend,May,2,2003,20030521,Wednesday,21,Weekday,May,2,2003,153.0,21
10,10214,23,S18_1749,1917 Grand Touring Sedan,Vintage Cars,86.7,107,458,"Corrida Auto Replicas, Ltd",Sommer,Martín,20040129,Thursday,29,Weekday,January,1,2004,20040126,Monday,26,Weekday,January,1,2004,166.6,30


Databricks data profile. Run in Databricks to view.

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.fact_orders_silver

col_name,data_type,comment
fact_order_key,bigint,
orderNumber,bigint,
product_key,bigint,
productCode,varchar(65535),
productName,varchar(65535),
productLine,varchar(65535),
buyPrice,double,
customer_key,bigint,
customerNumber,int,
customerName,string,


##### 6.3. Gold Table: Perform Aggregations
Create a new Gold table using the CTAS approach. 

In [0]:
%sql
CREATE OR REPLACE TABLE classicmodels_dlh.fact_orders_gold AS (
  SELECT customer_key AS CustomerID
    , productLine AS Product
    , customerName AS Customer
    , SUM(quantityOrdered) AS TotalQuantityOrdered
    , COUNT(product_key) AS ProductCount
  FROM classicmodels_dlh.fact_orders_silver
  GROUP BY productLine, customer_key, customerName
  ORDER BY ProductCount DESC);

SELECT * FROM classicmodels_dlh.fact_orders_gold;

CustomerID,Product,Customer,TotalQuantityOrdered,ProductCount
11,Classic Cars,Euro+ Shopping Channel,1761,52
6,Classic Cars,Mini Gifts Distributors Ltd.,1279,35
6,Vintage Cars,Mini Gifts Distributors Ltd.,1219,34
11,Vintage Cars,Euro+ Shopping Channel,888,27
61,Classic Cars,"Vida Sport, Ltd",794,23
16,Classic Cars,Muscle Machine Inc,649,18
53,Classic Cars,"Toms Spezialitäten, Ltd",558,17
56,Classic Cars,"Anna's Decorations, Ltd",526,17
91,Classic Cars,Salzburg Collectables,595,16
93,Classic Cars,L'ordine Souveniers,501,15


#### 9.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/ds2002_final_project_data/