# Final Project
Note: Place the final_data folder under FileStore in the DBFS

Note: Sometimes when you run all the cells, the flow of the notebook will be stalled at the SQL query : "SELECT * FROM sales_orders_bronze_tempview". You can interrupt this cell. Then, scroll down to the gold table and press run to see the gold table. 

#### Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "anh6ee-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "adventureworks"

connection_properties = {
  "user" : "nsuresh",
  "password" : "Centreville8",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "Cluster0.i1ocl"
atlas_database_name = "adventureworks"
atlas_user_name = "nithilsuresh8"
atlas_password = "Centreville8"

# Data Files (JSON) Information ###############################
dst_database = "adventure_works_dlh"

base_dir = "dbfs:/FileStore/final_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/sales"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/sales_orders"

orders_output_bronze = f"{database_dir}/fact_sales_orders/bronze"
orders_output_silver = f"{database_dir}/fact_sales_orders/silver"
orders_output_gold   = f"{database_dir}/fact_sales_orders/gold"


# # Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_sales_orders", True) 

# # Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### Fetch Reference Data From an Azure MySQL Database
##### Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS adventure_works_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS adventure_works_dlh
COMMENT "DS-2002 Final Project Database"
LOCATION "dbfs:/FileStore/final_data/adventure_works_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

##### Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://anh6ee-mysql.mysql.database.azure.com:3306/adventure_dw", 
  dbtable "dim_date",
  user "nsuresh",    
  password "Centreville8" 
)

In [0]:
%sql
USE DATABASE adventure_works_dlh;

CREATE OR REPLACE TABLE adventure_works_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/final_data/adventure_works_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED adventure_works_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM adventure_works_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### Create a New Table that Sources Customer Dimension Data from an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://anh6ee-mysql.mysql.database.azure.com:3306/adventure_dw",
  dbtable "dim_customers",
  user "nsuresh",    
  password "Centreville8"  
)

In [0]:
%sql
USE DATABASE adventure_works_dlh;

CREATE OR REPLACE TABLE adventure_works_dlh.dim_customers
COMMENT "Product Dimension Table"
LOCATION "dbfs:/FileStore/final_data/adventure_works_dlh/dim_customers"
AS SELECT * FROM view_customer

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED adventure_works_dlh.dim_customers;

col_name,data_type,comment
customer_key,bigint,
CustomerID,bigint,
TerritoryID,bigint,
AccountNumber,varchar(65535),
CustomerType,varchar(65535),
ModifiedDate_key,bigint,
,,
# Delta Statistics Columns,,
Column Names,"AccountNumber, CustomerType, CustomerID, ModifiedDate_key, TerritoryID, customer_key",
Column Selection Method,first-32,


In [0]:
%sql
SELECT * FROM adventure_works_dlh.dim_customers LIMIT 5

customer_key,CustomerID,TerritoryID,AccountNumber,CustomerType,ModifiedDate_key
1,1,1,AW00000001,S,20041013
2,2,1,AW00000002,S,20041013
3,3,4,AW00000003,S,20041013
4,4,4,AW00000004,S,20041013
5,5,4,AW00000005,S,20041013


#### Fetch Reference Data from a MongoDB Atlas Database
##### View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/final_data/sales/batch/products_js.csv,products_js.csv,67228,1733620315000
dbfs:/FileStore/final_data/sales/batch/ship_methods.json,ship_methods.json,780,1733620315000


%md
##### Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/final_data/sales/batch'
json_files = {"ship_methods" : 'ship_methods.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f3b89af7bc0>

##### Fetch Ship Methods Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "nithilsuresh8"
val pwd = "Centreville8"
val clusterName = "Cluster0.i1ocl"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_ship_methods = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "adventureworks")
.option("collection", "ship_methods").load()
.select("ship_method_key","ShipMethodID","Name","ShipBase","ShipRate")

display(df_ship_methods)

ship_method_key,ShipMethodID,Name,ShipBase,ShipRate
1,1,XRQ - TRUCK GROUND,3.95,0.99
2,2,ZY - EXPRESS,9.95,1.99
3,3,OVERSEAS - DELUXE,29.95,2.99
4,4,OVERNIGHT J-FAST,21.95,1.29
5,5,CARGO TRANSPORT 5,8.99,1.49


In [0]:
%scala
df_ship_methods.printSchema()

##### Use the Spark DataFrame to Create a New Ship Methods Dimension Table in the Databricks Metadata Database (adventure_works_dlh)

In [0]:
%scala
df_ship_methods.write.format("delta").mode("overwrite").saveAsTable("adventure_works_dlh.dim_ship_methods")

In [0]:
%sql
DESCRIBE EXTENDED adventure_works_dlh.dim_ship_methods

col_name,data_type,comment
ship_method_key,int,
ShipMethodID,int,
Name,string,
ShipBase,double,
ShipRate,double,
,,
# Delta Statistics Columns,,
Column Names,"ShipRate, Name, ship_method_key, ShipBase, ShipMethodID",
Column Selection Method,first-32,
,,


In [0]:
%sql
SELECT * FROM adventure_works_dlh.dim_ship_methods LIMIT 5

ship_method_key,ShipMethodID,Name,ShipBase,ShipRate
1,1,XRQ - TRUCK GROUND,3.95,0.99
2,2,ZY - EXPRESS,9.95,1.99
3,3,OVERSEAS - DELUXE,29.95,2.99
4,4,OVERNIGHT J-FAST,21.95,1.29
5,5,CARGO TRANSPORT 5,8.99,1.49


#### Fetch Data from a File System
##### Use PySpark to Read From a CSV File

In [0]:
product_csv = f"{batch_dir}/products_js.csv"

df_product = spark.read.format('csv').options(header='true', inferSchema='true').load(product_csv)
display(df_product)

product_key,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Size,SizeUnitMeasureCode,WeightUnitMeasureCode,Weight,DaysToManufacture,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate_key,SellEndDate_key,ModifiedDate_key
1,1,Adjustable Race,AR-5381,0,0,,1000,750,0.0,0.0,,,,,0,,,,,,,,20040311
2,2,Bearing Ball,BA-8327,0,0,,1000,750,0.0,0.0,,,,,0,,,,,,,,20040311
3,3,BB Ball Bearing,BE-2349,1,0,,800,600,0.0,0.0,,,,,1,,,,,,,,20040311
4,4,Headset Ball Bearings,BE-2908,0,0,,800,600,0.0,0.0,,,,,0,,,,,,,,20040311
5,316,Blade,BL-2036,1,0,,800,600,0.0,0.0,,,,,1,,,,,,,,20040311
6,317,LL Crankarm,CA-5965,0,0,Black,500,375,0.0,0.0,,,,,0,,L,,,,,,20040311
7,318,ML Crankarm,CA-6738,0,0,Black,500,375,0.0,0.0,,,,,0,,M,,,,,,20040311
8,319,HL Crankarm,CA-7457,0,0,Black,500,375,0.0,0.0,,,,,0,,,,,,,,20040311
9,320,Chainring Bolts,CB-2903,0,0,Silver,1000,750,0.0,0.0,,,,,0,,,,,,,,20040311
10,321,Chainring Nut,CN-6137,0,0,Silver,1000,750,0.0,0.0,,,,,0,,,,,,,,20040311


In [0]:
df_product.printSchema()

root
 |-- product_key: integer (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- ProductNumber: string (nullable = true)
 |-- MakeFlag: integer (nullable = true)
 |-- FinishedGoodsFlag: integer (nullable = true)
 |-- Color: string (nullable = true)
 |-- SafetyStockLevel: integer (nullable = true)
 |-- ReorderPoint: integer (nullable = true)
 |-- StandardCost: double (nullable = true)
 |-- ListPrice: double (nullable = true)
 |-- Size: string (nullable = true)
 |-- SizeUnitMeasureCode: string (nullable = true)
 |-- WeightUnitMeasureCode: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- DaysToManufacture: integer (nullable = true)
 |-- ProductLine: string (nullable = true)
 |-- Class: string (nullable = true)
 |-- Style: string (nullable = true)
 |-- ProductSubcategoryID: string (nullable = true)
 |-- ProductModelID: string (nullable = true)
 |-- SellStartDate_key: string (nullable = true)
 |-- SellEndDate_key: string 

In [0]:
df_product.write.format("delta").mode("overwrite").saveAsTable("adventure_works_dlh.dim_products")

In [0]:
%sql
DESCRIBE EXTENDED adventure_works_dlh.dim_products;

col_name,data_type,comment
product_key,int,
ProductID,int,
Name,string,
ProductNumber,string,
MakeFlag,int,
FinishedGoodsFlag,int,
Color,string,
SafetyStockLevel,int,
ReorderPoint,int,
StandardCost,double,


In [0]:
%sql
SELECT * FROM adventure_works_dlh.dim_products LIMIT 5;

product_key,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Size,SizeUnitMeasureCode,WeightUnitMeasureCode,Weight,DaysToManufacture,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate_key,SellEndDate_key,ModifiedDate_key
1,1,Adjustable Race,AR-5381,0,0,,1000,750,0.0,0.0,,,,,0,,,,,,,,20040311
2,2,Bearing Ball,BA-8327,0,0,,1000,750,0.0,0.0,,,,,0,,,,,,,,20040311
3,3,BB Ball Bearing,BE-2349,1,0,,800,600,0.0,0.0,,,,,1,,,,,,,,20040311
4,4,Headset Ball Bearings,BE-2908,0,0,,800,600,0.0,0.0,,,,,0,,,,,,,,20040311
5,316,Blade,BL-2036,1,0,,800,600,0.0,0.0,,,,,1,,,,,,,,20040311


##### Verify Dimension Tables

In [0]:
%sql
USE adventure_works_dlh;
SHOW TABLES

database,tableName,isTemporary
adventure_works_dlh,dim_customers,False
adventure_works_dlh,dim_date,False
adventure_works_dlh,dim_products,False
adventure_works_dlh,dim_ship_methods,False
,_sqldf,True
,display_query_1,True
,display_query_2,True
,display_query_3,True
,display_query_4,True
,display_query_5,True


### Integrate Reference Data with Real-Time Data
#### Use AutoLoader to Process Streaming (Hot Path) Sales Orders Fact Data 
##### Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_sales_order_key BIGINT")
 .option("cloudFiles.schemaHints", "SalesOrderID BIGINT")
 .option("cloudFiles.schemaHints", "SalesOrderDetailID BIGINT")
 .option("cloudFiles.schemaHints", "OrderDate_key BIGINT") 
 .option("cloudFiles.schemaHints", "DueDate_key BIGINT")
 .option("cloudFiles.schemaHints", "ShipDate_key BIGINT")
 .option("cloudFiles.schemaHints", "ModifiedDate_key BIGINT")
 .option("cloudFiles.schemaHints", "product_key BIGINT")
 .option("cloudFiles.schemaHints", "ship_method_key BIGINT") 
 .option("cloudFiles.schemaHints", "CarrierTrackingNumber TEXT")
 .option("cloudFiles.schemaHints", "OrderQty BIGINT")
 .option("cloudFiles.schemaHints", "SpecialOfferID BIGINT")
 .option("cloudFiles.schemaHints", "UnitPrice DOUBLE")
 .option("cloudFiles.schemaHints", "UnitPriceDiscount DOUBLE")
 .option("cloudFiles.schemaHints", "LineTotal DOUBLE")
 .option("cloudFiles.schemaHints", "RevisionNumber BIGINT")
 .option("cloudFiles.schemaHints", "Status BIGINT")
 .option("cloudFiles.schemaHints", "SalesOrderNumber TEXT")
 .option("cloudFiles.schemaHints", "PurchaseOrderNumber TEXT")
 .option("cloudFiles.schemaHints", "AccountNumber TEXT")
 .option("cloudFiles.schemaHints", "ContactID BIGINT") 
 .option("cloudFiles.schemaHints", "SalesPersonID DOUBLE")
 .option("cloudFiles.schemaHints", "TerritoryID BIGINT")
 .option("cloudFiles.schemaHints", "BillToAddressID BIGINT")
 .option("cloudFiles.schemaHints", "ShipToAddressID BIGINT")
 .option("cloudFiles.schemaHints", "CreditCardID DOUBLE")
 .option("cloudFiles.schemaHints", "CreditCardApprovalCode TEXT")
 .option("cloudFiles.schemaHints", "CurrencyRateID DOUBLE")
 .option("cloudFiles.schemaHints", "SubTotal DOUBLE")
 .option("cloudFiles.schemaHints", "TaxAmt DOUBLE")
 .option("cloudFiles.schemaHints", "Freight DOUBLE")
 .option("cloudFiles.schemaHints", "TotalDue DOUBLE")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("sales_orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW sales_orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM sales_orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM sales_orders_bronze_tempview

AccountNumber,BillToAddressID,CarrierTrackingNumber,ContactID,CreditCardApprovalCode,CreditCardID,CurrencyRateID,DueDate_key,Freight,LineTotal,ModifiedDate_key,OrderDate_key,OrderQty,PurchaseOrderNumber,RevisionNumber,SalesOrderDetailID,SalesOrderID,SalesOrderNumber,SalesPersonID,ShipDate_key,ShipToAddressID,SpecialOfferID,Status,SubTotal,TaxAmt,TerritoryID,TotalDue,UnitPrice,UnitPriceDiscount,customer_key,fact_sales_order_key,product_key,ship_method_key,_rescued_data,receipt_time,source_file
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,5.496,20030808,20030801,4,PO11368156185,1,40440,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,1.374,0.0,687,40440,378,5,,2024-12-08T01:14:22.457Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,599.496,20030808,20030801,4,PO11368156185,1,40441,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,149.874,0.0,687,40441,430,5,,2024-12-08T01:14:22.457Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,323.94,20030808,20030801,10,PO11368156185,1,40442,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,32.394,0.0,687,40442,386,5,,2024-12-08T01:14:22.457Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,299.748,20030808,20030801,2,PO11368156185,1,40443,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,149.874,0.0,687,40443,432,5,,2024-12-08T01:14:22.457Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,47.7,20030808,20030801,10,PO11368156185,1,40444,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,4.77,0.0,687,40444,382,5,,2024-12-08T01:14:22.457Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,8261.964,20030808,20030801,6,PO11368156185,1,40445,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,1376.994,0.0,687,40445,289,5,,2024-12-08T01:14:22.457Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,106.896,20030808,20030801,4,PO11368156185,1,40446,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,26.724,0.0,687,40446,313,5,,2024-12-08T01:14:22.457Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,1637.4,20030808,20030801,2,PO11368156185,1,40447,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,818.7,0.0,687,40447,244,5,,2024-12-08T01:14:22.457Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,119.976,20030808,20030801,4,PO11368156185,1,40448,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,29.994,0.0,687,40448,221,5,,2024-12-08T01:14:22.457Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,323.994,20030808,20030801,1,PO11368156185,1,40449,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,323.994,0.0,687,40449,497,5,,2024-12-08T01:14:22.457Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json


In [0]:
(spark.table("sales_orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_sales_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f3b685d8c50>

##### Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_sales_orders_bronze")
  .createOrReplaceTempView("sales_orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM sales_orders_silver_tempview

AccountNumber,BillToAddressID,CarrierTrackingNumber,ContactID,CreditCardApprovalCode,CreditCardID,CurrencyRateID,DueDate_key,Freight,LineTotal,ModifiedDate_key,OrderDate_key,OrderQty,PurchaseOrderNumber,RevisionNumber,SalesOrderDetailID,SalesOrderID,SalesOrderNumber,SalesPersonID,ShipDate_key,ShipToAddressID,SpecialOfferID,Status,SubTotal,TaxAmt,TerritoryID,TotalDue,UnitPrice,UnitPriceDiscount,customer_key,fact_sales_order_key,product_key,ship_method_key,_rescued_data,receipt_time,source_file
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,5.496,20030808,20030801,4,PO11368156185,1,40440,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,1.374,0.0,687,40440,378,5,,2024-12-08T01:14:23.641Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,599.496,20030808,20030801,4,PO11368156185,1,40441,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,149.874,0.0,687,40441,430,5,,2024-12-08T01:14:23.641Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,323.94,20030808,20030801,10,PO11368156185,1,40442,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,32.394,0.0,687,40442,386,5,,2024-12-08T01:14:23.641Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,299.748,20030808,20030801,2,PO11368156185,1,40443,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,149.874,0.0,687,40443,432,5,,2024-12-08T01:14:23.641Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,47.7,20030808,20030801,10,PO11368156185,1,40444,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,4.77,0.0,687,40444,382,5,,2024-12-08T01:14:23.641Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,8261.964,20030808,20030801,6,PO11368156185,1,40445,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,1376.994,0.0,687,40445,289,5,,2024-12-08T01:14:23.641Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,106.896,20030808,20030801,4,PO11368156185,1,40446,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,26.724,0.0,687,40446,313,5,,2024-12-08T01:14:23.641Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,1637.4,20030808,20030801,2,PO11368156185,1,40447,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,818.7,0.0,687,40447,244,5,,2024-12-08T01:14:23.641Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,119.976,20030808,20030801,4,PO11368156185,1,40448,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,29.994,0.0,687,40448,221,5,,2024-12-08T01:14:23.641Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json
10-4020-000687,754,257C-4EC8-98,272,86303Vi65437,12648,8745.0,20030813,1758.4676,323.994,20030808,20030801,1,PO11368156185,1,40449,51751,SO51751,289,20030808,754,1,5,70338.7056,5627.0964,8,77724.2696,323.994,0.0,687,40449,497,5,,2024-12-08T01:14:23.641Z,dbfs:/FileStore/final_data/sales/stream/sales_orders/fact_sales_orders2.json


In [0]:
%sql
DESCRIBE EXTENDED sales_orders_silver_tempview

col_name,data_type,comment
AccountNumber,string,
BillToAddressID,bigint,
CarrierTrackingNumber,string,
ContactID,bigint,
CreditCardApprovalCode,string,
CreditCardID,bigint,
CurrencyRateID,bigint,
DueDate_key,bigint,
Freight,double,
LineTotal,double,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_sales_orders_silver_tempview AS (
  SELECT so.fact_sales_order_key,
        so.SalesOrderID,
        so.product_key,
        p.Name AS product_name,
        so.customer_key,
        c.AccountNumber,
        c.CustomerType,
        so.ship_method_key,
        sm.Name AS ship_method_name,
        sm.ShipBase,
        sm.ShipRate,
        so.OrderDate_key,
        od.day_name_of_week AS order_day_name_of_week,
        od.day_of_month AS order_day_of_month,
        od.weekday_weekend AS order_weekday_weekend,
        od.month_name AS order_month_name,
        od.calendar_quarter AS order_quarter,
        od.calendar_year AS order_year,
        so.DueDate_key,
        dd.day_name_of_week AS order_due_day_name_of_week,
        dd.day_of_month AS order_due_day_of_month,
        dd.weekday_weekend AS order_due_weekday_weekend,
        dd.month_name AS order_due_month_name,
        dd.calendar_quarter AS order_due_quarter,
        dd.calendar_year AS order_due_year,
        so.ShipDate_key,
        sd.day_name_of_week AS order_ship_day_name_of_week,
        sd.day_of_month AS order_ship_day_of_month,
        sd.weekday_weekend AS order_ship_weekday_weekend,
        sd.month_name AS order_ship_month_name,
        sd.calendar_quarter AS order_ship_quarter,
        sd.calendar_year AS order_ship_year,
        so.ModifiedDate_key,
        md.day_name_of_week AS modified_day_name_of_week,
        md.day_of_month AS modified_day_of_month,
        md.weekday_weekend AS modified_weekday_weekend,
        md.month_name AS modified_month_name,
        md.calendar_quarter AS modified_ship_quarter,
        md.calendar_year AS modified_ship_year,
        so.OrderQty,
        so.UnitPrice,
        so.UnitPriceDiscount,
        so.LineTotal,
        so.SubTotal,
        so.TaxAmt,
        so.Freight,
        so.TotalDue
  FROM sales_orders_silver_tempview AS so
  INNER JOIN adventure_works_dlh.dim_products AS p
  ON p.product_key = so.product_key
  INNER JOIN adventure_works_dlh.dim_customers AS c
  ON c.customer_key = so.customer_key
  INNER JOIN adventure_works_dlh.dim_ship_methods AS sm
  ON sm.ship_method_key = so.ship_method_key
  LEFT OUTER JOIN adventure_works_dlh.dim_date AS od
  ON od.date_key = so.OrderDate_key
  LEFT OUTER JOIN adventure_works_dlh.dim_date AS dd
  ON dd.date_key = so.DueDate_key
  LEFT OUTER JOIN adventure_works_dlh.dim_date AS sd
  ON sd.date_key = so.ShipDate_key
  LEFT OUTER JOIN adventure_works_dlh.dim_date AS md
  ON md.date_key = so.ModifiedDate_key
)

In [0]:
(spark.table("fact_sales_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_sales_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f3b6045cb90>

In [0]:
%sql
SELECT * FROM fact_sales_orders_silver

fact_sales_order_key,SalesOrderID,product_key,product_name,customer_key,AccountNumber,CustomerType,ship_method_key,ship_method_name,ShipBase,ShipRate,OrderDate_key,order_day_name_of_week,order_day_of_month,order_weekday_weekend,order_month_name,order_quarter,order_year,DueDate_key,order_due_day_name_of_week,order_due_day_of_month,order_due_weekday_weekend,order_due_month_name,order_due_quarter,order_due_year,ShipDate_key,order_ship_day_name_of_week,order_ship_day_of_month,order_ship_weekday_weekend,order_ship_month_name,order_ship_quarter,order_ship_year,ModifiedDate_key,modified_day_name_of_week,modified_day_of_month,modified_weekday_weekend,modified_month_name,modified_ship_quarter,modified_ship_year,OrderQty,UnitPrice,UnitPriceDiscount,LineTotal,SubTotal,TaxAmt,Freight,TotalDue
40440,51751,378,Patch Kit/8 Patches,687,AW00000687,S,5,CARGO TRANSPORT 5,8.99,1.49,20030801,Friday,1,Weekday,August,3,2003,20030813,Wednesday,13,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,4,1.374,0.0,5.496,70338.7056,5627.0964,1758.4676,77724.2696
40441,51751,430,"LL Mountain Frame - Black, 44",687,AW00000687,S,5,CARGO TRANSPORT 5,8.99,1.49,20030801,Friday,1,Weekday,August,3,2003,20030813,Wednesday,13,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,4,149.874,0.0,599.496,70338.7056,5627.0964,1758.4676,77724.2696
40442,51751,386,"Short-Sleeve Classic Jersey, S",687,AW00000687,S,5,CARGO TRANSPORT 5,8.99,1.49,20030801,Friday,1,Weekday,August,3,2003,20030813,Wednesday,13,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,10,32.394,0.0,323.94,70338.7056,5627.0964,1758.4676,77724.2696
40443,51751,432,"LL Mountain Frame - Black, 52",687,AW00000687,S,5,CARGO TRANSPORT 5,8.99,1.49,20030801,Friday,1,Weekday,August,3,2003,20030813,Wednesday,13,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,2,149.874,0.0,299.748,70338.7056,5627.0964,1758.4676,77724.2696
40444,51751,382,Bike Wash - Dissolver,687,AW00000687,S,5,CARGO TRANSPORT 5,8.99,1.49,20030801,Friday,1,Weekday,August,3,2003,20030813,Wednesday,13,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,10,4.77,0.0,47.7,70338.7056,5627.0964,1758.4676,77724.2696
40445,51751,289,"Mountain-200 Black, 46",687,AW00000687,S,5,CARGO TRANSPORT 5,8.99,1.49,20030801,Friday,1,Weekday,August,3,2003,20030813,Wednesday,13,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,6,1376.994,0.0,8261.964,70338.7056,5627.0964,1758.4676,77724.2696
40446,51751,313,LL Mountain Handlebars,687,AW00000687,S,5,CARGO TRANSPORT 5,8.99,1.49,20030801,Friday,1,Weekday,August,3,2003,20030813,Wednesday,13,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,4,26.724,0.0,106.896,70338.7056,5627.0964,1758.4676,77724.2696
40447,51751,244,"HL Mountain Frame - Silver, 42",687,AW00000687,S,5,CARGO TRANSPORT 5,8.99,1.49,20030801,Friday,1,Weekday,August,3,2003,20030813,Wednesday,13,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,2,818.7,0.0,1637.4,70338.7056,5627.0964,1758.4676,77724.2696
40448,51751,221,"Long-Sleeve Logo Jersey, XL",687,AW00000687,S,5,CARGO TRANSPORT 5,8.99,1.49,20030801,Friday,1,Weekday,August,3,2003,20030813,Wednesday,13,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,4,29.994,0.0,119.976,70338.7056,5627.0964,1758.4676,77724.2696
40449,51751,497,"Mountain-500 Black, 48",687,AW00000687,S,5,CARGO TRANSPORT 5,8.99,1.49,20030801,Friday,1,Weekday,August,3,2003,20030813,Wednesday,13,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,20030808,Friday,8,Weekday,August,3,2003,1,323.994,0.0,323.994,70338.7056,5627.0964,1758.4676,77724.2696


In [0]:
%sql
DESCRIBE EXTENDED adventure_works_dlh.fact_sales_orders_silver

col_name,data_type,comment
fact_sales_order_key,bigint,
SalesOrderID,bigint,
product_key,bigint,
product_name,string,
customer_key,bigint,
AccountNumber,varchar(65535),
CustomerType,varchar(65535),
ship_method_key,bigint,
ship_method_name,string,
ShipBase,double,


##### Gold Table: Perform Aggregations

Create a new Gold table using the CTAS approach. The table tells us what products sold the best in what month. At the top of the table, we can see that the Long-Sleeve Logo Jersey, size L, was bought 36 times in the month of August. From this table, we can tell that most products are bought in August. 

In [0]:
%sql
CREATE OR REPLACE TABLE adventure_works_dlh.fact_monthly_orders_by_product_gold AS (
  SELECT product_key AS ProductID
    , product_name AS ProductName
    , order_month_name AS OrderMonth
    , COUNT(product_key) AS ProductCount
  FROM adventure_works_dlh.fact_sales_orders_silver
  GROUP BY ProductID, ProductName, OrderMonth
  ORDER BY ProductCount DESC);

SELECT * FROM adventure_works_dlh.fact_monthly_orders_by_product_gold;

ProductID,ProductName,OrderMonth,ProductCount
220,"Long-Sleeve Logo Jersey, L",August,36
217,AWC Logo Cap,August,34
216,"Sport-100 Helmet, Blue",August,31
212,"Sport-100 Helmet, Red",August,29
243,"LL Road Frame - Black, 52",August,28
213,"Sport-100 Helmet, Black",August,28
219,"Long-Sleeve Logo Jersey, M",August,22
221,"Long-Sleeve Logo Jersey, XL",August,21
375,Water Bottle - 30 oz.,August,20
381,Hitch Rack - 4-Bike,August,19


#### Clean up the File System

In [0]:
# %fs rm -r /FileStore/final_data/