## Imports

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

## Setup / Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ds2002-mysqlserver-pbj7bqf.mysql.database.azure.com"
jdbc_port = 3306
src_database = "project_db"

connection_properties = { 
  "user" : "pjiang123",
  "password" : "Password123",
  "driver" : "org.mariadb.jdbc.Driver"
}
# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "DS2002Cluster0"
atlas_database_name = "final_db"
atlas_user_name = "pbj7bqf"
atlas_password = "2s9ny1FJwGxr9mOI"

# Data Files (JSON) Information ###############################
dst_database = "final_dlh"

base_dir = "dbfs:/FileStore/project_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{base_dir}/stream"

orders_stream_dir = f"{stream_dir}"
purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
inventory_trans_stream_dir = f"{stream_dir}/inventory_transactions"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"

inventory_trans_output_bronze = f"{database_dir}/fact_inventory_transactions/bronze"
inventory_trans_output_silver = f"{database_dir}/fact_inventory_transactions/silver"
inventory_trans_output_gold   = f"{database_dir}/fact_inventory_transactions/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

## Essential Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.ycteshw.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.ycteshw.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

## Cold Path Data

In [0]:
%sql
DROP DATABASE IF EXISTS final_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS final_dlh
COMMENT "DS-2002 Final DB"
LOCATION "dbfs:/FileStore/project_data/final_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project DB");

### Source Data from Azure MySQL

#### Date Data

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysqlserver-pbj7bqf.mysql.database.azure.com:3306/project_db", --Replace with your Server Name
  dbtable "dim_date",
  user "pjiang123",    --Replace with your User Name
  password "Password123"  --Replace with you password
)

In [0]:
%sql
USE DATABASE final_dlh;

CREATE OR REPLACE TABLE final_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/project_data/final_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED final_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM final_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20030106,2003-01-06,2003/01/06,01/06/2003,06/01/2003,2,Monday,6,6,Weekday,2,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030107,2003-01-07,2003/01/07,01/07/2003,07/01/2003,3,Tuesday,7,7,Weekday,2,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030108,2003-01-08,2003/01/08,01/08/2003,08/01/2003,4,Wednesday,8,8,Weekday,2,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030109,2003-01-09,2003/01/09,01/09/2003,09/01/2003,5,Thursday,9,9,Weekday,2,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030110,2003-01-10,2003/01/10,01/10/2003,10/01/2003,6,Friday,10,10,Weekday,2,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3


## MongoDB Atlas

In [0]:
source_dir = '/dbfs/FileStore/project_data/batch'
json_files = {"customers" : 'dim_customers.json'
              , "employees" : 'dim_employees.json'
              , "mounts" : 'dim_mounts.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

InsertManyResult([ObjectId('663ea10be270e51953e5355e'), ObjectId('663ea10be270e51953e5355f'), ObjectId('663ea10be270e51953e53560'), ObjectId('663ea10be270e51953e53561'), ObjectId('663ea10be270e51953e53562'), ObjectId('663ea10be270e51953e53563'), ObjectId('663ea10be270e51953e53564'), ObjectId('663ea10be270e51953e53565'), ObjectId('663ea10be270e51953e53566'), ObjectId('663ea10be270e51953e53567'), ObjectId('663ea10be270e51953e53568'), ObjectId('663ea10be270e51953e53569'), ObjectId('663ea10be270e51953e5356a'), ObjectId('663ea10be270e51953e5356b'), ObjectId('663ea10be270e51953e5356c'), ObjectId('663ea10be270e51953e5356d'), ObjectId('663ea10be270e51953e5356e'), ObjectId('663ea10be270e51953e5356f'), ObjectId('663ea10be270e51953e53570'), ObjectId('663ea10be270e51953e53571'), ObjectId('663ea10be270e51953e53572'), ObjectId('663ea10be270e51953e53573'), ObjectId('663ea10be270e51953e53574'), ObjectId('663ea10be270e51953e53575'), ObjectId('663ea10be270e51953e53576'), ObjectId('663ea10be270e51953e535

In [0]:
%scala
import com.mongodb.spark._

val userName = "pbj7bqf"
val pwd = "2s9ny1FJwGxr9mOI"
val clusterName = "ds2002cluster0"
val atlas_uri = s"mongodb+srv://$userName:$pwd@ds2002cluster0.ycteshw.mongodb.net/"

### Customer Table

In [0]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "final_db")
.option("collection", "customers").load()
.select("customer_key","customerName","phone","addressLine1","city","state","postalCode","country")

display(df_customer)

customer_key,customerName,phone,addressLine1,city,state,postalCode,country
1,Atelier graphique,40.32.2555,"54, rue Royale",Nantes,,44000,France
2,Signal Gift Stores,7025551838,8489 Strong St.,Las Vegas,NV,83030,USA
3,"Australian Collectors, Co.",03 9520 4555,636 St Kilda Road,Melbourne,Victoria,3004,Australia
4,La Rochelle Gifts,40.67.8555,"67, rue des Cinquante Otages",Nantes,,44000,France
5,Baane Mini Imports,07-98 9555,Erling Skakkes gate 78,Stavern,,4110,Norway
6,Mini Gifts Distributors Ltd.,4155551450,5677 Strong St.,San Rafael,CA,97562,USA
7,Havel & Zbyszek Co,(26) 642-7555,ul. Filtrowa 68,Warszawa,,01-012,Poland
8,"Blauer See Auto, Co.",+49 69 66 90 2555,Lyonerstr. 34,Frankfurt,,60528,Germany
9,Mini Wheels Co.,6505555787,5557 North Pendale Street,San Francisco,CA,94217,USA
10,Land of Toys Inc.,2125557818,897 Long Airport Avenue,NYC,NY,10022,USA


In [0]:
%scala
df_customer.printSchema()

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("final_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED final_dlh.dim_customer

col_name,data_type,comment
customer_key,int,
customerName,string,
phone,string,
addressLine1,string,
city,string,
state,string,
postalCode,string,
country,string,
,,
# Delta Statistics Columns,,


In [0]:
%sql
SELECT * FROM final_dlh.dim_customer LIMIT 5

customer_key,customerName,phone,addressLine1,city,state,postalCode,country
1,Atelier graphique,40.32.2555,"54, rue Royale",Nantes,,44000,France
2,Signal Gift Stores,7025551838,8489 Strong St.,Las Vegas,NV,83030,USA
3,"Australian Collectors, Co.",03 9520 4555,636 St Kilda Road,Melbourne,Victoria,3004,Australia
4,La Rochelle Gifts,40.67.8555,"67, rue des Cinquante Otages",Nantes,,44000,France
5,Baane Mini Imports,07-98 9555,Erling Skakkes gate 78,Stavern,,4110,Norway


### Employee Table

In [0]:
%scala

val df_employee = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "final_db")
.option("collection", "employees").load()
.select("employee_key", "lastName", "firstName", "email", "jobTitle")

display(df_employee)

employee_key,lastName,firstName,email,jobTitle
1,Murphy,Diane,dmurphy@classicmodelcars.com,President
2,Patterson,Mary,mpatterso@classicmodelcars.com,VP Sales
3,Firrelli,Jeff,jfirrelli@classicmodelcars.com,VP Marketing
4,Patterson,William,wpatterson@classicmodelcars.com,Sales Manager (APAC)
5,Bondur,Gerard,gbondur@classicmodelcars.com,Sale Manager (EMEA)
6,Bow,Anthony,abow@classicmodelcars.com,Sales Manager (NA)
7,Jennings,Leslie,ljennings@classicmodelcars.com,Sales Rep
8,Thompson,Leslie,lthompson@classicmodelcars.com,Sales Rep
9,Firrelli,Julie,jfirrelli@classicmodelcars.com,Sales Rep
10,Patterson,Steve,spatterson@classicmodelcars.com,Sales Rep


In [0]:
%scala
df_employee.printSchema()

In [0]:
%scala
df_employee.write.format("delta").mode("overwrite").saveAsTable("final_dlh.dim_employee")

In [0]:
%sql
DESCRIBE EXTENDED final_dlh.dim_employee

col_name,data_type,comment
employee_key,int,
lastName,string,
firstName,string,
email,string,
jobTitle,string,
,,
# Delta Statistics Columns,,
Column Names,"email, lastName, firstName, employee_key, jobTitle",
Column Selection Method,first-32,
,,


In [0]:
%sql
SELECT * FROM final_dlh.dim_employee LIMIT 5

employee_key,lastName,firstName,email,jobTitle
1,Murphy,Diane,dmurphy@classicmodelcars.com,President
2,Patterson,Mary,mpatterso@classicmodelcars.com,VP Sales
3,Firrelli,Jeff,jfirrelli@classicmodelcars.com,VP Marketing
4,Patterson,William,wpatterson@classicmodelcars.com,Sales Manager (APAC)
5,Bondur,Gerard,gbondur@classicmodelcars.com,Sale Manager (EMEA)


### Mounts Table

In [0]:
%scala

val df_mount = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "final_db")
.option("collection", "mounts").load()
.select("mount_key", "name", "description", "enhanced_description", "seats", "patch", "tradeable", "product_id", "quantityInStock", "buyPrice", "MSRP")

display(df_mount)

mount_key,name,description,enhanced_description,seats,patch,tradeable,product_id,quantityInStock,buyPrice,MSRP
1,Company Chocobo,Summon your Grand Company-issued battle chocobo.,"Born and bred in the city–state of Ishgard, the majority of company chocobos are geldings of the rouncey variety; however, massive destriers and miniature Belah'dian jennets are also raised to accommodate the builds of Roegadyn and Lalafellin riders respectively.",1,2.0,False,S10_1678,7933,48.81,95.7
2,Goobbue,Use the odd-shaped horn gifted to you by the wandering minstrel to call a massive goobbue to your side.,"Traded for a song by the High Satrap of a distant land, the secret to taming these creatures once thought to be unbreakable was introduced to Eorzea by a mysterious wandering minstrel who journeyed through the realm in the final days before the Calamity.",1,2.0,False,S10_1949,7305,98.58,214.3
3,Legacy Chocobo,Five years and a realm-shaking Calamity were not enough to break the bonds between you and your loyal steed. A single whistle and he will be at your side once again.,"In the five years following your sudden disappearance from the Carteneau Flats, your ever-faithful chocobo spent each waking moment galloping across the realm in search of his lost master. His myriad adventures are nothing less than fantastical and heartbreaking...but that is a story for another day.",1,2.0,False,S10_2016,6625,68.99,118.94
4,Magitek Armor,Call into battle your suit of Garlond Ironworks-modified magitek armor.,"This suit of Garlean-developed reaper-class magitek battle armor has had its original control systems purged and replaced with a new, modified core built by Cid of the Garlond Ironworks.",1,2.0,False,S10_4698,5582,91.02,193.66
5,Coeurl,"Summon your fleet-footed battle coeurl, trained from a pup to recognize you as a friend...not food.","Native to the jungles of the Near East, coeurls were only recently introduced to Eorzea, brought over from faraway lands such as Thavnair by Ul'dahn nobles who thought to breed them as pets. Now, but a handful of skilled tamers living in recluse on the Pearl are said to be able to train them.",1,2.0,False,S10_4757,3252,85.68,136.0
6,Ahriman,"Summon forth a riding ahriman, bound to this realm with powerful magicks guaranteed to never fade.","Though little is known of the void and the creatures which call the extra-dimensional realm home, scholars have discovered that there is a strict hierarchy amongst voidsent consisting of twelve distinct tiers. Ahrimans are believed to fall into the fifth, making them formidable foes...when not enthralled by curses.",1,2.0,False,S10_4962,6791,103.42,147.74
7,Unicorn,Summon forth the single-horned steed you befriended in the Black Shroud.,"Not long ago, Eorzean unicorns numbered in the thousands, and could oft be seen roaming the Coerthas highlands in great majestic herds that would render the mountains white as snow. Poaching has caused the population to decline, and if it were not for your act of kindness, there would be one fewer.",1,2.0,False,S12_1099,68,95.34,194.57
8,Behemoth,"Summon your mighty behemoth, trained from birth to carry you...not trample you underfoot like an ant.","Immediately following the Calamity, an adventurer returning to Gridania from Falcon's Nest in Coerthas claims to have happened upon a den of baby behemoths who had lost their mother. Despite objections from friends and family alike, she kept the beasts, eventually training them to serve as mounts.",1,2.1,False,S12_1108,3619,95.59,207.8
9,Cavalry Drake,"Summon your cavalry drake, trained by the drake whisperer of the Brotherhood of Ash.","Training a cavalry drake is a long and arduous process that begins the moment the scalekin hatches. Never allowed to see its mother, the creature is raised solely in the presence of a drake whisperer while being weaned on the incendiary glands of aged battle drakes to ensure it is both submissive and deadly.",1,2.1,False,S12_1666,1579,77.9,136.67
10,Laurel Goobbue,"Summon your laurel goobbue, towering friend to the sylphs.","After having the moss and flowers growing upon his head pulled up and tossed aside like common weeds by a band of sylvan ne'er-do-wells, this quiet-mannered goobbue was taken in and cared for by the sylphs of Little Solace whose elders crafted for him a magnificent laurel with which to hide his scars.",1,2.1,False,S12_2823,9997,66.27,150.62


In [0]:
%scala
df_mount.printSchema()

In [0]:
%scala
df_mount.write.format("delta").mode("overwrite").saveAsTable("final_dlh.dim_mount")

In [0]:
%sql
DESCRIBE EXTENDED final_dlh.dim_mount

col_name,data_type,comment
mount_key,int,
name,string,
description,string,
enhanced_description,string,
seats,int,
patch,string,
tradeable,string,
product_id,string,
quantityInStock,int,
buyPrice,double,


In [0]:
%sql
SELECT * FROM final_dlh.dim_mount LIMIT 5

mount_key,name,description,enhanced_description,seats,patch,tradeable,product_id,quantityInStock,buyPrice,MSRP
1,Company Chocobo,Summon your Grand Company-issued battle chocobo.,"Born and bred in the city–state of Ishgard, the majority of company chocobos are geldings of the rouncey variety; however, massive destriers and miniature Belah'dian jennets are also raised to accommodate the builds of Roegadyn and Lalafellin riders respectively.",1,2.0,False,S10_1678,7933,48.81,95.7
2,Goobbue,Use the odd-shaped horn gifted to you by the wandering minstrel to call a massive goobbue to your side.,"Traded for a song by the High Satrap of a distant land, the secret to taming these creatures once thought to be unbreakable was introduced to Eorzea by a mysterious wandering minstrel who journeyed through the realm in the final days before the Calamity.",1,2.0,False,S10_1949,7305,98.58,214.3
3,Legacy Chocobo,Five years and a realm-shaking Calamity were not enough to break the bonds between you and your loyal steed. A single whistle and he will be at your side once again.,"In the five years following your sudden disappearance from the Carteneau Flats, your ever-faithful chocobo spent each waking moment galloping across the realm in search of his lost master. His myriad adventures are nothing less than fantastical and heartbreaking...but that is a story for another day.",1,2.0,False,S10_2016,6625,68.99,118.94
4,Magitek Armor,Call into battle your suit of Garlond Ironworks-modified magitek armor.,"This suit of Garlean-developed reaper-class magitek battle armor has had its original control systems purged and replaced with a new, modified core built by Cid of the Garlond Ironworks.",1,2.0,False,S10_4698,5582,91.02,193.66
5,Coeurl,"Summon your fleet-footed battle coeurl, trained from a pup to recognize you as a friend...not food.","Native to the jungles of the Near East, coeurls were only recently introduced to Eorzea, brought over from faraway lands such as Thavnair by Ul'dahn nobles who thought to breed them as pets. Now, but a handful of skilled tamers living in recluse on the Pearl are said to be able to train them.",1,2.0,False,S10_4757,3252,85.68,136.0


In [0]:
%sql
USE final_dlh;
SHOW TABLES

database,tableName,isTemporary
final_dlh,dim_customer,False
final_dlh,dim_date,False
final_dlh,dim_employee,False
final_dlh,dim_mount,False
,display_query_1,True
,display_query_2,True
,display_query_3,True
,display_query_4,True
,display_query_5,True
,display_query_6,True


## Autoloader (Hot path)

### Bronze Table

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

customer_key,fact_order_key,mount_key,orderDate_key,orderNumber,priceEach,quantityOrdered,shippedDate_key,status,_rescued_data,receipt_time,source_file
16,1000,49,20030603,10127,126.39,25,20030606.0,Shipped,,2024-05-10T22:36:03.645Z,dbfs:/FileStore/project_data/stream/fact_orders2.json
16,1001,49,20041201,10349,140.75,34,20041203.0,Shipped,,2024-05-10T22:36:03.645Z,dbfs:/FileStore/project_data/stream/fact_orders2.json
74,1002,49,20040505,10247,143.62,48,20040508.0,Shipped,,2024-05-10T22:36:03.645Z,dbfs:/FileStore/project_data/stream/fact_orders2.json
68,1003,49,20031114,10185,127.82,39,20031120.0,Shipped,,2024-05-10T22:36:03.645Z,dbfs:/FileStore/project_data/stream/fact_orders2.json
73,1004,49,20030925,10152,117.77,35,20031001.0,Shipped,,2024-05-10T22:36:03.645Z,dbfs:/FileStore/project_data/stream/fact_orders2.json
43,1005,49,20041022,10314,129.26,29,20041023.0,Shipped,,2024-05-10T22:36:03.645Z,dbfs:/FileStore/project_data/stream/fact_orders2.json
26,1006,49,20050505,10413,133.57,49,20050509.0,Shipped,,2024-05-10T22:36:03.645Z,dbfs:/FileStore/project_data/stream/fact_orders2.json
93,1007,49,20031106,10176,140.75,36,20031112.0,Shipped,,2024-05-10T22:36:03.645Z,dbfs:/FileStore/project_data/stream/fact_orders2.json
105,1008,49,20031126,10196,126.39,27,20031201.0,Shipped,,2024-05-10T22:36:03.645Z,dbfs:/FileStore/project_data/stream/fact_orders2.json
83,1009,49,20040616,10260,137.88,23,0.0,Cancelled,,2024-05-10T22:36:03.645Z,dbfs:/FileStore/project_data/stream/fact_orders2.json


In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7faee6f3db90>

### Silver Table

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

customer_key,fact_order_key,mount_key,orderDate_key,orderNumber,priceEach,quantityOrdered,shippedDate_key,status,_rescued_data,receipt_time,source_file
86,1,23,20030106,10100,136.0,30,20030110.0,Shipped,,2024-05-10T22:36:25.388Z,dbfs:/FileStore/project_data/stream/fact_orders1.json
11,2,23,20050210,10379,156.4,39,20050211.0,Shipped,,2024-05-10T22:36:25.388Z,dbfs:/FileStore/project_data/stream/fact_orders1.json
57,3,23,20031105,10173,168.3,24,20031109.0,Shipped,,2024-05-10T22:36:25.388Z,dbfs:/FileStore/project_data/stream/fact_orders1.json
118,4,23,20041117,10331,154.7,44,20041123.0,Shipped,,2024-05-10T22:36:25.388Z,dbfs:/FileStore/project_data/stream/fact_orders1.json
30,5,23,20030318,10110,153.0,42,20030320.0,Shipped,,2024-05-10T22:36:25.388Z,dbfs:/FileStore/project_data/stream/fact_orders1.json
6,6,23,20031112,10182,159.8,44,20031118.0,Shipped,,2024-05-10T22:36:25.388Z,dbfs:/FileStore/project_data/stream/fact_orders1.json
6,7,23,20041021,10312,146.2,48,20041023.0,Shipped,,2024-05-10T22:36:25.388Z,dbfs:/FileStore/project_data/stream/fact_orders1.json
80,8,23,20041125,10344,168.3,45,20041129.0,Shipped,,2024-05-10T22:36:25.388Z,dbfs:/FileStore/project_data/stream/fact_orders1.json
2,9,23,20030521,10124,153.0,21,20030525.0,Shipped,,2024-05-10T22:36:25.388Z,dbfs:/FileStore/project_data/stream/fact_orders1.json
107,10,23,20040126,10214,166.6,30,20040129.0,Shipped,,2024-05-10T22:36:25.388Z,dbfs:/FileStore/project_data/stream/fact_orders1.json


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
customer_key,bigint,
fact_order_key,bigint,
mount_key,bigint,
orderDate_key,bigint,
orderNumber,bigint,
priceEach,double,
quantityOrdered,bigint,
shippedDate_key,double,
status,string,
_rescued_data,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.fact_order_key,
      o.orderNumber,
      o.customer_key,
      c.customerName,
      c.phone,
      c.addressLine1,
      c.city,
      c.state,
      c.postalCode,
      c.country,
      o.mount_key,
      p.product_id,
      p.name,
      p.description,
      p.seats,
      p.patch,
      p.tradeable,
      p.quantityInStock,
      p.buyPrice,
      p.MSRP,
      o.orderDate_key,
      od.day_name_of_week AS order_day_name_of_week,
      od.day_of_month AS order_day_of_month,
      od.weekday_weekend AS order_weekday_weekend,
      od.month_name AS order_month_name,
      od.calendar_quarter AS order_quarter,
      od.calendar_year AS order_year,
      o.shippedDate_key,
      sd.day_name_of_week AS shipped_day_name_of_week,
      sd.day_of_month AS shipped_day_of_month,
      sd.weekday_weekend AS shipped_weekday_weekend,
      sd.month_name AS shipped_month_name,
      sd.calendar_quarter AS shipped_calendar_quarter,
      sd.calendar_year AS shipped_calendar_year,
      o.quantityOrdered,
      o.priceEach,
      o.status
  FROM orders_silver_tempview AS o
  INNER JOIN final_dlh.dim_customer AS c
  ON c.customer_key = o.customer_key
  INNER JOIN final_dlh.dim_mount AS p
  ON p.mount_key = o.mount_key
  LEFT OUTER JOIN final_dlh.dim_date AS od
  ON od.date_key = o.orderDate_key
  LEFT OUTER JOIN final_dlh.dim_date AS sd
  ON sd.date_key = o.shippedDate_key
)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7faee6e210d0>

In [0]:
%sql
SELECT * FROM fact_orders_silver

fact_order_key,orderNumber,customer_key,customerName,phone,addressLine1,city,state,postalCode,country,mount_key,product_id,name,description,seats,patch,tradeable,quantityInStock,buyPrice,MSRP,orderDate_key,order_day_name_of_week,order_day_of_month,order_weekday_weekend,order_month_name,order_quarter,order_year,shippedDate_key,shipped_day_name_of_week,shipped_day_of_month,shipped_weekday_weekend,shipped_month_name,shipped_calendar_quarter,shipped_calendar_year,quantityOrdered,priceEach,status
1,10100,86,Online Diecast Creations Co.,6035558647,2304 Long Airport Avenue,Nashua,NH,62005,USA,23,S18_1749,Direwolf,"Summon forth your deadly direwolf, one of five pups found huddling together in the snow near the corpse of their slain mother.",1,2.35,False,2724,86.7,170.0,20030106,Monday,6,Weekday,January,1,2003,20030110.0,Friday,10.0,Weekday,January,1.0,2003.0,30,136.0,Shipped
2,10379,11,Euro+ Shopping Channel,(91) 555 94 44,"C/ Moralzarzal, 86",Madrid,,28034,Spain,23,S18_1749,Direwolf,"Summon forth your deadly direwolf, one of five pups found huddling together in the snow near the corpse of their slain mother.",1,2.35,False,2724,86.7,170.0,20050210,Thursday,10,Weekday,February,1,2005,20050211.0,Friday,11.0,Weekday,February,1.0,2005.0,39,156.4,Shipped
3,10173,57,Rovelli Gifts,035-640555,Via Ludovico il Moro 22,Bergamo,,24100,Italy,23,S18_1749,Direwolf,"Summon forth your deadly direwolf, one of five pups found huddling together in the snow near the corpse of their slain mother.",1,2.35,False,2724,86.7,170.0,20031105,Wednesday,5,Weekday,November,4,2003,20031109.0,Sunday,9.0,Weekend,November,4.0,2003.0,24,168.3,Shipped
4,10331,118,Motor Mint Distributors Inc.,2155559857,11328 Douglas Av.,Philadelphia,PA,71270,USA,23,S18_1749,Direwolf,"Summon forth your deadly direwolf, one of five pups found huddling together in the snow near the corpse of their slain mother.",1,2.35,False,2724,86.7,170.0,20041117,Wednesday,17,Weekday,November,4,2004,20041123.0,Tuesday,23.0,Weekday,November,4.0,2004.0,44,154.7,Shipped
5,10110,30,"AV Stores, Co.",(171) 555-1555,Fauntleroy Circus,Manchester,,EC2 5NT,UK,23,S18_1749,Direwolf,"Summon forth your deadly direwolf, one of five pups found huddling together in the snow near the corpse of their slain mother.",1,2.35,False,2724,86.7,170.0,20030318,Tuesday,18,Weekday,March,1,2003,20030320.0,Thursday,20.0,Weekday,March,1.0,2003.0,42,153.0,Shipped
6,10182,6,Mini Gifts Distributors Ltd.,4155551450,5677 Strong St.,San Rafael,CA,97562,USA,23,S18_1749,Direwolf,"Summon forth your deadly direwolf, one of five pups found huddling together in the snow near the corpse of their slain mother.",1,2.35,False,2724,86.7,170.0,20031112,Wednesday,12,Weekday,November,4,2003,20031118.0,Tuesday,18.0,Weekday,November,4.0,2003.0,44,159.8,Shipped
7,10312,6,Mini Gifts Distributors Ltd.,4155551450,5677 Strong St.,San Rafael,CA,97562,USA,23,S18_1749,Direwolf,"Summon forth your deadly direwolf, one of five pups found huddling together in the snow near the corpse of their slain mother.",1,2.35,False,2724,86.7,170.0,20041021,Thursday,21,Weekday,October,4,2004,20041023.0,Saturday,23.0,Weekend,October,4.0,2004.0,48,146.2,Shipped
8,10344,80,Marseille Mini Autos,91.24.4555,"12, rue des Bouchers",Marseille,,13008,France,23,S18_1749,Direwolf,"Summon forth your deadly direwolf, one of five pups found huddling together in the snow near the corpse of their slain mother.",1,2.35,False,2724,86.7,170.0,20041125,Thursday,25,Weekday,November,4,2004,20041129.0,Monday,29.0,Weekday,November,4.0,2004.0,45,168.3,Shipped
9,10124,2,Signal Gift Stores,7025551838,8489 Strong St.,Las Vegas,NV,83030,USA,23,S18_1749,Direwolf,"Summon forth your deadly direwolf, one of five pups found huddling together in the snow near the corpse of their slain mother.",1,2.35,False,2724,86.7,170.0,20030521,Wednesday,21,Weekday,May,2,2003,20030525.0,Sunday,25.0,Weekend,May,2.0,2003.0,21,153.0,Shipped
10,10214,107,"Corrida Auto Replicas, Ltd",(91) 555 22 82,"C/ Araquil, 67",Madrid,,28023,Spain,23,S18_1749,Direwolf,"Summon forth your deadly direwolf, one of five pups found huddling together in the snow near the corpse of their slain mother.",1,2.35,False,2724,86.7,170.0,20040126,Monday,26,Weekday,January,1,2004,20040129.0,Thursday,29.0,Weekday,January,1.0,2004.0,30,166.6,Shipped


In [0]:
%sql
DESCRIBE EXTENDED final_dlh.fact_orders_silver

col_name,data_type,comment
fact_order_key,bigint,
orderNumber,bigint,
customer_key,bigint,
customerName,string,
phone,string,
addressLine1,string,
city,string,
state,string,
postalCode,string,
country,string,


### Gold Table

In [0]:
%sql
CREATE OR REPLACE TABLE final_dlh.fact_most_expensive_order_by_customer_gold AS (
  SELECT `customerName`,
  SUM(`quantityOrdered`) AS total_quantity_ordered,
  SUM(`priceEach`) AS total_price,
  MAX(`orderDate_key`) AS latest_order,
  MAX(`MSRP`) AS most_expensive_mount
  FROM final_dlh.fact_orders_silver AS fo
  GROUP BY `customerName`
  ORDER BY SUM(`priceEach`) DESC
);

SELECT * FROM final_dlh.fact_most_expensive_order_by_customer_gold;

customerName,total_quantity_ordered,total_price,latest_order,most_expensive_mount
Euro+ Shopping Channel,9327,22680.0,20050531,214.3
Mini Gifts Distributors Ltd.,6366,16746.58,20050529,214.3
"Australian Collectors, Co.",1926,5159.19,20041129,214.3
Muscle Machine Inc,1775,4800.74,20041201,207.8
Land of Toys Inc.,1631,4575.969999999999,20041115,194.57
La Rochelle Gifts,1832,4554.76,20050531,194.57
"Dragon Souveniers, Ltd.",1524,4372.499999999999,20050302,214.3
"Anna's Decorations, Ltd",1469,4314.9400000000005,20050309,214.3
"AV Stores, Co.",1778,4297.810000000001,20041117,207.8
"Down Under Souveniers, Inc",1691,4189.179999999999,20050408,194.57
