# Project 2: Capstone

## Pre-Processing Steps
#### Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "<server_name>.mysql.database.azure.com"
jdbc_port = 3306
src_database = "adventureworks_dw"

connection_properties = {
  "user" : "user_name",
  "password" : "password",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "ds2002.zsrbu"
atlas_database_name = "adventureworks_dw"
atlas_user_name = "nkr7fg"
atlas_password = "Daufuskie1"

# Data Files (JSON) Information ###############################
dst_database = "adventureworks_dlh"

base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

purchase_orders_stream_dir = f"{stream_dir}/aw_purchase_orders"

purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

## Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### Fetch Reference Data From an Azure MySQL Database
##### Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS adventureworks_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS adventureworks_dlh
COMMENT "DS-2002 Capstone AdventureWorks Database"
LOCATION "dbfs:/FileStore/lab_data/adventureworks_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone");

##### Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds-2002-mysql.mysql.database.azure.com:3306/adventureworks_dw", 
  dbtable "dim_date",
  user "bhowe",    
  password "Daufuskie1"  
)

In [0]:
%sql
USE DATABASE adventureworks_dlh;

CREATE OR REPLACE TABLE adventureworks_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/adventureworks_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM adventureworks_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds-2002-mysql.mysql.database.azure.com:3306/adventureworks_dw", 
  dbtable "dim_products",
  user "bhowe",   
  password "Daufuskie1" 
)

In [0]:
%sql
USE DATABASE adventureworks_dlh;

CREATE OR REPLACE TABLE adventureworks_dlh.dim_product
COMMENT "Product Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/adventureworks_dlh/dim_product"
AS SELECT * FROM view_product

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM adventureworks_dlh.dim_product LIMIT 5

ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Size,SizeUnitMeasureCode,WeightUnitMeasureCode,Weight,DaysToManufacture,ProductLine,Class,Style,ProductCategory,ProductSubcategory,ProductModel,SellStartDate,SellEndDate,DiscontinuedDate
1,Adjustable Race,AR-5381,False,False,,1000,750,0.0,0.0,,,,,0,,,,,,,1998-06-01T00:00:00Z,,
2,Bearing Ball,BA-8327,False,False,,1000,750,0.0,0.0,,,,,0,,,,,,,1998-06-01T00:00:00Z,,
3,BB Ball Bearing,BE-2349,True,False,,800,600,0.0,0.0,,,,,1,,,,,,,1998-06-01T00:00:00Z,,
4,Headset Ball Bearings,BE-2908,False,False,,800,600,0.0,0.0,,,,,0,,,,,,,1998-06-01T00:00:00Z,,
316,Blade,BL-2036,True,False,,800,600,0.0,0.0,,,,,1,,,,,,,1998-06-01T00:00:00Z,,


#### Fetch Reference Data from a MongoDB Atlas Database
##### View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/lab_data/retail/batch'

path,name,size,modificationTime
dbfs:/FileStore/lab_data/retail/batch/adventureworks_dw_dim_customer.json,adventureworks_dw_dim_customer.json,478186,1746754194000
dbfs:/FileStore/lab_data/retail/batch/adventureworks_dw_dim_employee.csv,adventureworks_dw_dim_employee.csv,53785,1746755556000
dbfs:/FileStore/lab_data/retail/batch/adventureworks_dw_dim_vendors.csv,adventureworks_dw_dim_vendors.csv,11849,1746755787000
dbfs:/FileStore/lab_data/retail/batch/adventureworks_dw_dim_vendors.json,adventureworks_dw_dim_vendors.json,39705,1746809107000


##### Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/lab_data/retail/batch'
json_files = {"vendors" : 'adventureworks_dw_dim_vendors.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f7c41e98600>

##### Fetch Vendor Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "nkr7fg"
val pwd = "Daufuskie1"
val clusterName = "ds2002.zsrbu"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_vendor = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "adventureworks_dw")
.option("collection", "vendors").load()
.select("VendorID","AccountNumber","Name","CreditRating","PreferredVendorStatus","ActiveFlag","AddressType","AddressLine1","City","StateProvinceCode","State_Province","PostalCode")

display(df_vendor)

VendorID,AccountNumber,Name,CreditRating,PreferredVendorStatus,ActiveFlag,AddressType,AddressLine1,City,StateProvinceCode,State_Province,PostalCode
1,INTERNAT0001,International,1,1,1,Main Office,683 Larch Ct.,Salt Lake City,UT,Utah,84101
2,ELECTRON0002,Electronic Bike Repair & Supplies,1,1,1,Main Office,8547 Catherine Way,Tacoma,WA,Washington,98403
3,PREMIER0001,"Premier Sport, Inc.",1,1,1,Main Office,7682 Fern Leaf Lane,Boston,MA,Massachusetts,2113
4,COMFORT0001,Comfort Road Bicycles,1,1,1,Main Office,7651 Smiling Tree Court,Los Angeles,CA,California,90012
5,METROSP0001,Metro Sport Equipment,1,1,1,Main Office,60 Oakgrove Rd.,Lebanon,OR,Oregon,97355
6,GREENLA0001,Green Lake Bike Company,1,1,1,Main Office,2342 Peachwillow,Denver,CO,Colorado,80203
7,MOUNTAIN0001,Mountain Works,1,0,1,Main Office,8 Rogers Ave.,Everett,WA,Washington,98201
8,CONTINEN0001,Continental Pro Cycles,3,1,1,Main Office,2 Lion Circle,Long Beach,CA,California,90802
9,ADATUM0001,A. Datum Corporation,1,1,1,Main Office,2596 Big Canyon Road,New York,NY,New York,10007
10,TREYRE0001,Trey Research,3,1,1,Main Office,1874 Valley Blvd.,Palo Alto,CA,California,94303


In [0]:
%scala
df_vendor.printSchema()

##### Use the Spark DataFrame to Create a New Vendor Dimension Table in the Databricks Metadata Database (adventureworks_dlh)

In [0]:
%scala
df_vendor.write.format("delta").mode("overwrite").saveAsTable("adventureworks_dlh.dim_vendor")

In [0]:
%sql
DESCRIBE EXTENDED adventureworks_dlh.dim_vendor

col_name,data_type,comment
VendorID,int,
AccountNumber,string,
Name,string,
CreditRating,int,
PreferredVendorStatus,string,
ActiveFlag,string,
AddressType,string,
AddressLine1,string,
City,string,
StateProvinceCode,string,


In [0]:
%sql
SELECT * FROM adventureworks_dlh.dim_vendor LIMIT 5

VendorID,AccountNumber,Name,CreditRating,PreferredVendorStatus,ActiveFlag,AddressType,AddressLine1,City,StateProvinceCode,State_Province,PostalCode
1,INTERNAT0001,International,1,1,1,Main Office,683 Larch Ct.,Salt Lake City,UT,Utah,84101
2,ELECTRON0002,Electronic Bike Repair & Supplies,1,1,1,Main Office,8547 Catherine Way,Tacoma,WA,Washington,98403
3,PREMIER0001,"Premier Sport, Inc.",1,1,1,Main Office,7682 Fern Leaf Lane,Boston,MA,Massachusetts,2113
4,COMFORT0001,Comfort Road Bicycles,1,1,1,Main Office,7651 Smiling Tree Court,Los Angeles,CA,California,90012
5,METROSP0001,Metro Sport Equipment,1,1,1,Main Office,60 Oakgrove Rd.,Lebanon,OR,Oregon,97355


#### Fetch Data from a File System
##### Use PySpark to Read Employee Dimension Data From a CSV File

In [0]:
employee_csv = f"{batch_dir}/adventureworks_dw_dim_employee.csv"

df_employee = spark.read.format('csv').options(header='true', inferSchema='true').load(employee_csv)
display(df_employee)

EmployeeID,NationalIDNumber,LoginID,ManagerID,FirstName,MiddleName,LastName,Title,EmailAddress,EmailPromotion,Phone,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag
1,14417807,adventure-works\guy1,16.0,Guy,R,Gilbert,Production Technician - WC60,guy1@adventure-works.com,0,320-555-0195,1972-05-15T00:00:00Z,M,M,1996-07-31T00:00:00Z,0,21,30,1
2,253022876,adventure-works\kevin0,6.0,Kevin,F,Brown,Marketing Assistant,kevin0@adventure-works.com,2,150-555-0189,1977-06-03T00:00:00Z,S,M,1997-02-26T00:00:00Z,0,42,41,1
3,509647174,adventure-works\roberto0,12.0,Roberto,,Tamburello,Engineering Manager,roberto0@adventure-works.com,0,212-555-0187,1964-12-13T00:00:00Z,M,M,1997-12-12T00:00:00Z,1,2,21,1
4,112457891,adventure-works\rob0,3.0,Rob,,Walters,Senior Tool Designer,rob0@adventure-works.com,0,612-555-0100,1965-01-23T00:00:00Z,S,M,1998-01-05T00:00:00Z,0,48,80,1
5,480168528,adventure-works\thierry0,263.0,Thierry,B,D'Hers,Tool Designer,thierry0@adventure-works.com,2,168-555-0183,1949-08-29T00:00:00Z,M,M,1998-01-11T00:00:00Z,0,9,24,1
6,24756624,adventure-works\david0,109.0,David,M,Bradley,Marketing Manager,david0@adventure-works.com,1,913-555-0172,1965-04-19T00:00:00Z,S,M,1998-01-20T00:00:00Z,1,40,40,1
7,309738752,adventure-works\jolynn0,21.0,JoLynn,M,Dobney,Production Supervisor - WC60,jolynn0@adventure-works.com,1,903-555-0145,1946-02-16T00:00:00Z,S,F,1998-01-26T00:00:00Z,0,82,61,1
8,690627818,adventure-works\ruth0,185.0,Ruth,Ann,Ellerbrock,Production Technician - WC10,ruth0@adventure-works.com,0,145-555-0130,1946-07-06T00:00:00Z,M,F,1998-02-06T00:00:00Z,0,83,61,1
9,695256908,adventure-works\gail0,3.0,Gail,A,Erickson,Design Engineer,gail0@adventure-works.com,0,849-555-0139,1942-10-29T00:00:00Z,M,F,1998-02-06T00:00:00Z,1,5,22,1
10,912265825,adventure-works\barry0,185.0,Barry,K,Johnson,Production Technician - WC10,barry0@adventure-works.com,0,206-555-0180,1946-04-27T00:00:00Z,S,M,1998-02-07T00:00:00Z,0,88,64,1


In [0]:
df_employee.printSchema()

root
 |-- EmployeeID: integer (nullable = true)
 |-- NationalIDNumber: integer (nullable = true)
 |-- LoginID: string (nullable = true)
 |-- ManagerID: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- MiddleName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- EmailAddress: string (nullable = true)
 |-- EmailPromotion: integer (nullable = true)
 |-- Phone: string (nullable = true)
 |-- BirthDate: timestamp (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- HireDate: timestamp (nullable = true)
 |-- SalariedFlag: integer (nullable = true)
 |-- VacationHours: integer (nullable = true)
 |-- SickLeaveHours: integer (nullable = true)
 |-- CurrentFlag: integer (nullable = true)



In [0]:
df_employee.write.format("delta").mode("overwrite").saveAsTable("adventureworks_dlh.dim_employee")

In [0]:
%sql
DESCRIBE EXTENDED adventureworks_dlh.dim_employee;

col_name,data_type,comment
EmployeeID,int,
NationalIDNumber,int,
LoginID,string,
ManagerID,string,
FirstName,string,
MiddleName,string,
LastName,string,
Title,string,
EmailAddress,string,
EmailPromotion,int,


In [0]:
%sql
SELECT * FROM adventureworks_dlh.dim_employee LIMIT 5;

EmployeeID,NationalIDNumber,LoginID,ManagerID,FirstName,MiddleName,LastName,Title,EmailAddress,EmailPromotion,Phone,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag
1,14417807,adventure-works\guy1,16,Guy,R,Gilbert,Production Technician - WC60,guy1@adventure-works.com,0,320-555-0195,1972-05-15T00:00:00Z,M,M,1996-07-31T00:00:00Z,0,21,30,1
2,253022876,adventure-works\kevin0,6,Kevin,F,Brown,Marketing Assistant,kevin0@adventure-works.com,2,150-555-0189,1977-06-03T00:00:00Z,S,M,1997-02-26T00:00:00Z,0,42,41,1
3,509647174,adventure-works\roberto0,12,Roberto,,Tamburello,Engineering Manager,roberto0@adventure-works.com,0,212-555-0187,1964-12-13T00:00:00Z,M,M,1997-12-12T00:00:00Z,1,2,21,1
4,112457891,adventure-works\rob0,3,Rob,,Walters,Senior Tool Designer,rob0@adventure-works.com,0,612-555-0100,1965-01-23T00:00:00Z,S,M,1998-01-05T00:00:00Z,0,48,80,1
5,480168528,adventure-works\thierry0,263,Thierry,B,D'Hers,Tool Designer,thierry0@adventure-works.com,2,168-555-0183,1949-08-29T00:00:00Z,M,M,1998-01-11T00:00:00Z,0,9,24,1


#### Use AutoLoader to Process Streaming (Hot Path) Purchase Orders Fact Data
##### Bronze Table: Process 'Raw' JSON Data

In [0]:
files = [
    ("fact_purchases01.json", "fact_purchases01_fixed.json"),
    ("fact_purchases02.json", "fact_purchases02_fixed.json"),
    ("fact_purchases03.json", "fact_purchases03_fixed.json")
]

base_path = "/dbfs/FileStore/lab_data/retail/stream/aw_purchase_orders/"

for infile, outfile in files:
    with open(base_path + infile) as f_in:
        data = json.load(f_in)  

    with open(base_path + outfile, "w") as f_out:
        for record in data:
            f_out.write(json.dumps(record) + "\n")


In [0]:
spark.readStream \
  .format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", purchase_orders_output_bronze) \
  .load(purchase_orders_stream_dir) \
  .createOrReplaceTempView("purchase_orders_bronze_tempview")

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW purchase_orders_enriched_tempview AS
SELECT 
  *,
  current_timestamp() AS row_ingestion_timestamp,
  input_file_name() AS row_source_file
FROM purchase_orders_bronze_tempview;

In [0]:
%sql
SELECT * FROM purchase_orders_bronze_tempview

DueDate,EmployeeID,Freight,LineTotal,OrderDate,OrderQty,ProductID,PurchaseOrderID,ReceivedQty,RejectedQty,RevisionNumber,ShipBase,ShipDate,ShipMethod,ShipRate,Status,StockedQty,SubTotal,TaxAmt,TotalDue,UnitPrice,VendorID,_rescued_data
2001-05-31 00:00:00,241,221.1825,8847.3,2001-05-17 00:00:00,550,530,3,550.0,0.0,0,9.95,2001-05-26 00:00:00,ZY - EXPRESS,1.99,4,550.0,8847.3,707.784,9776.2665,16.086,38,
2001-06-14 00:00:00,223,365.7019,14628.075,2001-05-31 00:00:00,550,513,6,468.0,0.0,0,29.95,2001-06-09 00:00:00,OVERSEAS - DELUXE,2.99,4,468.0,14628.075,1170.246,16164.0229,26.5965,11,
2002-01-28 00:00:00,261,17.3541,129.969,2002-01-14 00:00:00,3,426,9,3.0,0.0,1,8.99,2002-01-23 00:00:00,CARGO TRANSPORT 5,1.49,4,3.0,694.1655,55.5332,767.0528,43.323,74,
2002-01-28 00:00:00,261,17.3541,136.269,2002-01-14 00:00:00,3,425,9,3.0,0.0,1,8.99,2002-01-23 00:00:00,CARGO TRANSPORT 5,1.49,4,3.0,694.1655,55.5332,767.0528,45.423,74,
2002-01-28 00:00:00,261,17.3541,149.0895,2002-01-14 00:00:00,3,424,9,3.0,0.0,1,8.99,2002-01-23 00:00:00,CARGO TRANSPORT 5,1.49,4,3.0,694.1655,55.5332,767.0528,49.6965,74,
2002-01-28 00:00:00,261,17.3541,136.269,2002-01-14 00:00:00,3,423,9,3.0,0.0,1,8.99,2002-01-23 00:00:00,CARGO TRANSPORT 5,1.49,4,3.0,694.1655,55.5332,767.0528,45.423,74,
2002-01-28 00:00:00,261,17.3541,142.569,2002-01-14 00:00:00,3,422,9,3.0,0.0,1,8.99,2002-01-23 00:00:00,CARGO TRANSPORT 5,1.49,4,3.0,694.1655,55.5332,767.0528,47.523,74,
2002-01-28 00:00:00,231,866.1056,34644.225,2002-01-14 00:00:00,550,941,12,550.0,82.0,0,8.99,2002-01-23 00:00:00,CARGO TRANSPORT 5,1.49,4,468.0,34644.225,2771.538,38281.8686,62.9895,80,
2002-01-28 00:00:00,164,2.5641,102.564,2002-01-14 00:00:00,3,497,15,3.0,0.0,0,8.99,2002-01-23 00:00:00,CARGO TRANSPORT 5,1.49,4,3.0,102.564,8.2051,113.3332,34.188,46,
2002-01-29 00:00:00,238,409.8308,2934.54,2002-01-15 00:00:00,60,496,18,60.0,0.0,0,8.99,2002-01-24 00:00:00,CARGO TRANSPORT 5,1.49,4,60.0,16393.23,1311.4584,18114.5192,48.909,86,


In [0]:
(spark.table("purchase_orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{purchase_orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_purchase_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f7c41f5e7d0>

##### Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_purchase_orders_bronze")
  .createOrReplaceTempView("purchase_orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM purchase_orders_silver_tempview

DueDate,EmployeeID,Freight,LineTotal,OrderDate,OrderQty,ProductID,PurchaseOrderID,ReceivedQty,RejectedQty,RevisionNumber,ShipBase,ShipDate,ShipMethod,ShipRate,Status,StockedQty,SubTotal,TaxAmt,TotalDue,UnitPrice,VendorID,_rescued_data
2001-05-31 00:00:00,241,221.1825,8847.3,2001-05-17 00:00:00,550,530,3,550.0,0.0,0,9.95,2001-05-26 00:00:00,ZY - EXPRESS,1.99,4,550.0,8847.3,707.784,9776.2665,16.086,38,
2001-06-14 00:00:00,223,365.7019,14628.075,2001-05-31 00:00:00,550,513,6,468.0,0.0,0,29.95,2001-06-09 00:00:00,OVERSEAS - DELUXE,2.99,4,468.0,14628.075,1170.246,16164.0229,26.5965,11,
2002-01-28 00:00:00,261,17.3541,129.969,2002-01-14 00:00:00,3,426,9,3.0,0.0,1,8.99,2002-01-23 00:00:00,CARGO TRANSPORT 5,1.49,4,3.0,694.1655,55.5332,767.0528,43.323,74,
2002-01-28 00:00:00,261,17.3541,136.269,2002-01-14 00:00:00,3,425,9,3.0,0.0,1,8.99,2002-01-23 00:00:00,CARGO TRANSPORT 5,1.49,4,3.0,694.1655,55.5332,767.0528,45.423,74,
2002-01-28 00:00:00,261,17.3541,149.0895,2002-01-14 00:00:00,3,424,9,3.0,0.0,1,8.99,2002-01-23 00:00:00,CARGO TRANSPORT 5,1.49,4,3.0,694.1655,55.5332,767.0528,49.6965,74,
2002-01-28 00:00:00,261,17.3541,136.269,2002-01-14 00:00:00,3,423,9,3.0,0.0,1,8.99,2002-01-23 00:00:00,CARGO TRANSPORT 5,1.49,4,3.0,694.1655,55.5332,767.0528,45.423,74,
2002-01-28 00:00:00,261,17.3541,142.569,2002-01-14 00:00:00,3,422,9,3.0,0.0,1,8.99,2002-01-23 00:00:00,CARGO TRANSPORT 5,1.49,4,3.0,694.1655,55.5332,767.0528,47.523,74,
2002-01-28 00:00:00,231,866.1056,34644.225,2002-01-14 00:00:00,550,941,12,550.0,82.0,0,8.99,2002-01-23 00:00:00,CARGO TRANSPORT 5,1.49,4,468.0,34644.225,2771.538,38281.8686,62.9895,80,
2002-01-28 00:00:00,164,2.5641,102.564,2002-01-14 00:00:00,3,497,15,3.0,0.0,0,8.99,2002-01-23 00:00:00,CARGO TRANSPORT 5,1.49,4,3.0,102.564,8.2051,113.3332,34.188,46,
2002-01-29 00:00:00,238,409.8308,2934.54,2002-01-15 00:00:00,60,496,18,60.0,0.0,0,8.99,2002-01-24 00:00:00,CARGO TRANSPORT 5,1.49,4,60.0,16393.23,1311.4584,18114.5192,48.909,86,


In [0]:
%sql
DESCRIBE EXTENDED purchase_orders_silver_tempview

col_name,data_type,comment
DueDate,string,
EmployeeID,string,
Freight,string,
LineTotal,string,
OrderDate,string,
OrderQty,string,
ProductID,string,
PurchaseOrderID,string,
ReceivedQty,string,
RejectedQty,string,


This SQL statement creates a temporary Silver-layer view called purchase_orders_silver_tempview, which enriches raw purchase order data from the Bronze layer by joining it with several dimension tables. The view adds descriptive information such as the vendor's name, product name, and employee details, and includes role-playing joins on the dim_date table to provide full date context for the order, ship, and due dates.

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW purchase_orders_silver_tempview AS
SELECT 
  po.PurchaseOrderID AS PurchaseOrderKey,
  po.VendorID AS VendorKey,
  v.Name AS VendorName,
  
  po.EmployeeID AS EmployeeKey,
  e.FirstName AS EmployeeFirstName,
  e.LastName AS EmployeeLastName,
  e.Gender AS EmployeeGender,

  po.ProductID AS ProductKey,
  p.Name AS ProductName,

  po.OrderDate,
  od.full_date AS OrderDate_Full,
  
  po.ShipDate,
  sd.full_date AS ShipDate_Full,

  po.DueDate,
  dd.full_date AS DueDate_Full,

  po.OrderQty,
  po.UnitPrice,
  po.LineTotal,
  po.Freight,
  po.SubTotal,
  po.TaxAmt,
  po.TotalDue,
  po.ShipMethod,
  po.ShipBase,
  po.ShipRate,
  po.ReceivedQty,
  po.RejectedQty,
  po.StockedQty,
  po.RevisionNumber,
  po.Status,
  
  current_timestamp() AS row_ingestion_timestamp

FROM purchase_orders_bronze_tempview po
LEFT JOIN dim_vendor v
  ON po.VendorID = v.VendorID
LEFT JOIN dim_employee e
  ON po.EmployeeID = e.EmployeeID
LEFT JOIN dim_product p
  ON po.ProductID = p.ProductID
LEFT JOIN dim_date od
  ON po.OrderDate = od.full_date
LEFT JOIN dim_date sd
  ON po.ShipDate = sd.full_date
LEFT JOIN dim_date dd
  ON po.DueDate = dd.full_date


In [0]:
(spark.table("purchase_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{purchase_orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_purchase_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f7c602d8fd0>

In [0]:
%sql
SELECT * FROM fact_purchase_orders_silver

PurchaseOrderKey,VendorKey,VendorName,EmployeeKey,EmployeeFirstName,EmployeeLastName,EmployeeGender,ProductKey,ProductName,OrderDate,OrderDate_Full,ShipDate,ShipDate_Full,DueDate,DueDate_Full,OrderQty,UnitPrice,LineTotal,Freight,SubTotal,TaxAmt,TotalDue,ShipMethod,ShipBase,ShipRate,ReceivedQty,RejectedQty,StockedQty,RevisionNumber,Status,row_ingestion_timestamp
3,38,Allenson Cycles,241,Eric,Kurjan,M,530,Seat Post,2001-05-17 00:00:00,2001-05-17,2001-05-26 00:00:00,2001-05-26,2001-05-31 00:00:00,2001-05-31,550,16.086,8847.3,221.1825,8847.3,707.784,9776.2665,ZY - EXPRESS,9.95,1.99,550.0,0.0,550.0,0,4,2025-05-09T17:00:00.04Z
6,11,Anderson's Custom Bikes,223,Linda,Meisner,F,513,Touring Rim,2001-05-31 00:00:00,2001-05-31,2001-06-09 00:00:00,2001-06-09,2001-06-14 00:00:00,2001-06-14,550,26.5965,14628.075,365.7019,14628.075,1170.246,16164.0229,OVERSEAS - DELUXE,29.95,2.99,468.0,0.0,468.0,0,4,2025-05-09T17:00:00.04Z
9,74,Australia Bike Retailer,261,Ben,Miller,M,426,Thin-Jam Lock Nut 15,2002-01-14 00:00:00,2002-01-14,2002-01-23 00:00:00,2002-01-23,2002-01-28 00:00:00,2002-01-28,3,43.323,129.969,17.3541,694.1655,55.5332,767.0528,CARGO TRANSPORT 5,8.99,1.49,3.0,0.0,3.0,1,4,2025-05-09T17:00:00.04Z
9,74,Australia Bike Retailer,261,Ben,Miller,M,425,Thin-Jam Lock Nut 2,2002-01-14 00:00:00,2002-01-14,2002-01-23 00:00:00,2002-01-23,2002-01-28 00:00:00,2002-01-28,3,45.423,136.269,17.3541,694.1655,55.5332,767.0528,CARGO TRANSPORT 5,8.99,1.49,3.0,0.0,3.0,1,4,2025-05-09T17:00:00.04Z
9,74,Australia Bike Retailer,261,Ben,Miller,M,424,Thin-Jam Lock Nut 1,2002-01-14 00:00:00,2002-01-14,2002-01-23 00:00:00,2002-01-23,2002-01-28 00:00:00,2002-01-28,3,49.6965,149.0895,17.3541,694.1655,55.5332,767.0528,CARGO TRANSPORT 5,8.99,1.49,3.0,0.0,3.0,1,4,2025-05-09T17:00:00.04Z
9,74,Australia Bike Retailer,261,Ben,Miller,M,423,Thin-Jam Lock Nut 10,2002-01-14 00:00:00,2002-01-14,2002-01-23 00:00:00,2002-01-23,2002-01-28 00:00:00,2002-01-28,3,45.423,136.269,17.3541,694.1655,55.5332,767.0528,CARGO TRANSPORT 5,8.99,1.49,3.0,0.0,3.0,1,4,2025-05-09T17:00:00.04Z
9,74,Australia Bike Retailer,261,Ben,Miller,M,422,Thin-Jam Lock Nut 9,2002-01-14 00:00:00,2002-01-14,2002-01-23 00:00:00,2002-01-23,2002-01-28 00:00:00,2002-01-28,3,47.523,142.569,17.3541,694.1655,55.5332,767.0528,CARGO TRANSPORT 5,8.99,1.49,3.0,0.0,3.0,1,4,2025-05-09T17:00:00.04Z
12,80,Bicycle Specialists,231,Fukiko,Ogisu,M,941,Touring Pedal,2002-01-14 00:00:00,2002-01-14,2002-01-23 00:00:00,2002-01-23,2002-01-28 00:00:00,2002-01-28,550,62.9895,34644.225,866.1056,34644.225,2771.538,38281.8686,CARGO TRANSPORT 5,8.99,1.49,550.0,82.0,468.0,0,4,2025-05-09T17:00:00.04Z
15,46,Burnett Road Warriors,164,Mikael,Sandberg,M,497,Pinch Bolt,2002-01-14 00:00:00,2002-01-14,2002-01-23 00:00:00,2002-01-23,2002-01-28 00:00:00,2002-01-28,3,34.188,102.564,2.5641,102.564,8.2051,113.3332,CARGO TRANSPORT 5,8.99,1.49,3.0,0.0,3.0,0,4,2025-05-09T17:00:00.04Z
18,86,Carlson Specialties,238,Frank,Pellow,M,496,Paint - Yellow,2002-01-15 00:00:00,2002-01-15,2002-01-24 00:00:00,2002-01-24,2002-01-29 00:00:00,2002-01-29,60,48.909,2934.54,409.8308,16393.23,1311.4584,18114.5192,CARGO TRANSPORT 5,8.99,1.49,60.0,0.0,60.0,0,4,2025-05-09T17:00:00.04Z


In [0]:
%sql
DESCRIBE EXTENDED fact_purchase_orders_silver

col_name,data_type,comment
PurchaseOrderKey,string,
VendorKey,string,
VendorName,string,
EmployeeKey,string,
EmployeeFirstName,string,
EmployeeLastName,string,
EmployeeGender,string,
ProductKey,string,
ProductName,varchar(50),
OrderDate,string,


#### Gold Table: Perform Aggregations
##### I created a new Gold table that includes the total amount (total list price) of the purchase orders placed per vendor for each product. The final table lists Vendor' Company Name, the Product Name, and the Total List Price in descending order.

In [0]:
%sql
SELECT 
  VendorName AS Vendor_Company_Name,
  ProductName,
  SUM(LineTotal) AS Total_List_Price
FROM purchase_orders_silver_tempview
GROUP BY VendorName,ProductName
ORDER BY Total_List_Price DESC

Vendor_Company_Name,ProductName,Total_List_Price
Superior Bicycles,Rear Brakes,1457887.1999999995
Superior Bicycles,Front Brakes,1457887.1999999995
Bicycle Specialists,Touring Pedal,1247192.0999999996
Inline Accessories,HL Mountain Pedal,1177903.6499999997
"Compete Enterprises, Inc",HL Road Pedal,1039326.7499999997
Greenwood Athletic Company,ML Mountain Pedal,903013.6499999996
Crowley Sport,ML Mountain Pedal,903013.6499999996
Mitchell Sports,ML Road Pedal,849895.1999999997
West Junction Cycles,HL Crankarm,817000.8000000002
"Trikes, Inc.",HL Mountain Tire,795021.1499999998
