## DS-3002: Sample Capstone Project
This notebook demonstrates many of the software libraries and programming techniques required to fulfill the requirements of the final end-of-session capstone project for course **DS-3002: Data Systems** at the University of Virginia School of Data Science. The spirit of the project is to provide a capstone challenge that requires students to demonstrate a practical and functional understanding of each of the data systems and architectural principles covered throughout the session.

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Relational Databases Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Relational Databases Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

What's more, this project requires students to make effective decisions regarding whether to implement a Cloud-hosted, on-premises hosted, or hybrid architecture.

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [None]:
import os
import json
import pymongo
#import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [None]:
# Azure SQL Server Connection Information #####################
jdbc_hostname = "ds-2002-final-2.mysql.database.azure.com"
jdbc_port = 3306
src_database = "classicmodels"

connection_properties = {
  "user" : "msqldatabricks",
  "password" : "Lyella227",
  "driver" : "org.mariadb.jdbc.Driver" 
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0"
atlas_database_name = "classicmodels"
atlas_user_name = "emrkraisinger"
atlas_password = "Lyella227"

# Data Files (JSON) Information ###############################
dst_database = "classicmodels"

base_dir = "dbfs:/FileStore/ds3002-capstone"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

output_bronze = f"{database_dir}/fact_sales_orders/bronze"
output_silver = f"{database_dir}/fact_sales_orders/silver"
output_gold   = f"{database_dir}/fact_sales_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_sales_orders", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

#### 3.0. Define Global Functions

In [None]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the Azure SQL database server.
# ######################################################################################################################
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure SQL Database'''
    jdbcUrl = f"jdbc:sqlserver://{host_name}:{port};database={db_name}"
    
    '''Invoke the spark.read.jdbc() function to query the database, and fill a Pandas DataFrame.'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)
    
    return dframe


# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.pfsh0cc.mongodb.net/{atlas_database_name}?retryWrites=true&w=majority"
    client = pymongo.MongoClient(mongo_uri)
    

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[atlas_database_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.pfsh0cc.mongodb.net/{atlas_database_name}?retryWrites=true&w=majority"
    client = pymongo.MongoClient(mongo_uri)
    db = client[atlas_database_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure SQL Database
##### 1.1. Create a New Databricks Metadata Database, and then Create a New Table that Sources its Data from a View in an Azure SQL database.

In [None]:
%sql
DROP DATABASE IF EXISTS classicmodels CASCADE;

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS classicmodels
COMMENT "Capstone Project Database"
LOCATION "dbfs:/FileStore/ds3002-capstone/classicmodels"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-3002 Capstone Project");

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds-2002-final-2.mysql.database.azure.com:3306/classicmodels",
  dbtable "classicmodels.products",
  user "msqldatabricks",
  password "Lyella227"
)

In [None]:
%sql
USE classicmodels;


In [None]:
%sql
USE classicmodels;

CREATE TABLE IF NOT EXISTS classicmodels.dim_products
COMMENT "Products Dimension Table"
LOCATION "dbfs:/FileStore/ds3002-capstone/classicmodels/dim_products"
AS SELECT * FROM view_product

num_affected_rows,num_inserted_rows


In [None]:
%sql
SELECT * FROM classicmodels.dim_products LIMIT 5

productCode,productName,productLine,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,"This replica features working kickstand, front suspension, gear-shift lever, footbrake lever, drive chain, wheels and steering. All parts are particularly delicate due to their precise scale and require special care and attention.",7933,48.81,95.7
S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,7305,98.58,214.3
S10_2016,1996 Moto Guzzi 1100i,Motorcycles,1:10,Highway 66 Mini Classics,"Official Moto Guzzi logos and insignias, saddle bags located on side of motorcycle, detailed engine, working steering, working suspension, two leather seats, luggage rack, dual exhaust pipes, small saddle bag located on handle bars, two-tone paint with chrome accents, superior die-cast detail , rotating wheels , working kick stand, diecast metal with plastic parts and baked enamel finish.",6625,68.99,118.94
S10_4698,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,1:10,Red Start Diecast,"Model features, official Harley Davidson logos and insignias, detachable rear wheelie bar, heavy diecast metal with resin parts, authentic multi-color tampo-printed graphics, separate engine drive belts, free-turning front fork, rotating tires and rear racing slick, certificate of authenticity, detailed engine, display stand , precision diecast replica, baked enamel finish, 1:10 scale model, removable fender, seat and tank cover piece for displaying the superior detail of the v-twin engine",5582,91.02,193.66
S10_4757,1972 Alfa Romeo GTA,Classic Cars,1:10,Motor City Art Classics,Features include: Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,3252,85.68,136.0


In [None]:
%sql
DESCRIBE EXTENDED classicmodels.dim_products;

col_name,data_type,comment
productCode,string,
productName,string,
productLine,string,
productScale,string,
productVendor,string,
productDescription,string,
quantityInStock,int,
buyPrice,"decimal(10,2)",
MSRP,"decimal(10,2)",
,,


##### 1.2. Create a New Table that Sources its Data from a Table in an Azure SQL database.

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds-2002-final-2.mysql.database.azure.com:3306/classicmodels",
  dbtable "classicmodels.dim_date",
  user "msqldatabricks",
  password "Lyella227"
)

In [None]:
%sql
USE classicmodels;

CREATE TABLE IF NOT EXISTS classicmodels.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds3002-capstone/classicmodels/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [None]:
%sql
SELECT * FROM classicmodels.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20030101,2003-01-01,2003/01/01,01/01/2003,01/01/2003,4,Wednesday,1,1,Weekday,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030102,2003-01-02,2003/01/02,01/02/2003,02/01/2003,5,Thursday,2,2,Weekday,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030103,2003-01-03,2003/01/03,01/03/2003,03/01/2003,6,Friday,3,3,Weekday,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030104,2003-01-04,2003/01/04,01/04/2003,04/01/2003,7,Saturday,4,4,Weekend,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030105,2003-01-05,2003/01/05,01/05/2003,05/01/2003,1,Sunday,5,5,Weekend,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3


In [None]:
%sql
DESCRIBE EXTENDED classicmodels.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [None]:
display(dbutils.fs.ls(batch_dir))

path,name,size
dbfs:/FileStore/ds3002-capstone/source_data/batch/customers.json,customers.json,47574
dbfs:/FileStore/ds3002-capstone/source_data/batch/payments.csv,payments.csv,8994


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [None]:
src_file_path = '/dbfs/FileStore/ds3002-capstone/source_data/batch'
json_files = {"customers" : 'customers.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, src_file_path, json_files)

##### 2.3. Fetch Data from the New MongoDB Collection

In [None]:
%scala
import com.mongodb.spark._

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "classicmodels").option("collection", "customers").load()
display(df_customer)

_id,addressLine1,addressLine2,city,contactFirstName,contactLastName,country,creditLimit,customerName,customerNumber,phone,postalCode,salesRepEmployeeNumber,state
List(639a773830addd55554cce73),"54, rue Royale",,Nantes,Carine,Schmitt,France,21000.0,Atelier graphique,103,40.32.2555,44000,1370.0,
List(639a773830addd55554cce74),8489 Strong St.,,Las Vegas,Jean,King,USA,71800.0,Signal Gift Stores,112,7025551838,83030,1166.0,NV
List(639a773830addd55554cce75),636 St Kilda Road,Level 3,Melbourne,Peter,Ferguson,Australia,117300.0,"Australian Collectors, Co.",114,03 9520 4555,3004,1611.0,Victoria
List(639a773830addd55554cce76),"67, rue des Cinquante Otages",,Nantes,Janine,Labrune,France,118200.0,La Rochelle Gifts,119,40.67.8555,44000,1370.0,
List(639a773830addd55554cce77),Erling Skakkes gate 78,,Stavern,Jonas,Bergulfsen,Norway,81700.0,Baane Mini Imports,121,07-98 9555,4110,1504.0,
List(639a773830addd55554cce78),5677 Strong St.,,San Rafael,Susan,Nelson,USA,210500.0,Mini Gifts Distributors Ltd.,124,4155551450,97562,1165.0,CA
List(639a773830addd55554cce79),ul. Filtrowa 68,,Warszawa,Zbyszek,Piestrzeniewicz,Poland,0.0,Havel & Zbyszek Co,125,(26) 642-7555,01-012,,
List(639a773830addd55554cce7a),Lyonerstr. 34,,Frankfurt,Roland,Keitel,Germany,59700.0,"Blauer See Auto, Co.",128,+49 69 66 90 2555,60528,1504.0,
List(639a773830addd55554cce7b),5557 North Pendale Street,,San Francisco,Julie,Murphy,USA,64600.0,Mini Wheels Co.,129,6505555787,94217,1165.0,CA
List(639a773830addd55554cce7c),897 Long Airport Avenue,,NYC,Kwai,Lee,USA,114900.0,Land of Toys Inc.,131,2125557818,10022,1323.0,NY


In [None]:
%scala
df_customer.printSchema()

##### 2.4. Use the Spark DataFrame to Create a New Table in the Databricks (Adventure Works) Metadata Database

In [None]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("classicmodels.dim_customer")

In [None]:
%sql
DESCRIBE EXTENDED classicmodels.dim_customer

col_name,data_type,comment
_id,struct,
addressLine1,string,
addressLine2,string,
city,string,
contactFirstName,string,
contactLastName,string,
country,string,
creditLimit,double,
customerName,string,
customerNumber,int,


##### 2.5. Query the New Table in the Databricks Metadata Database

In [None]:
%sql
SELECT * FROM classicmodels.dim_customer LIMIT 5

_id,addressLine1,addressLine2,city,contactFirstName,contactLastName,country,creditLimit,customerName,customerNumber,phone,postalCode,salesRepEmployeeNumber,state
List(639a773830addd55554cce73),"54, rue Royale",,Nantes,Carine,Schmitt,France,21000.0,Atelier graphique,103,40.32.2555,44000,1370,
List(639a773830addd55554cce74),8489 Strong St.,,Las Vegas,Jean,King,USA,71800.0,Signal Gift Stores,112,7025551838,83030,1166,NV
List(639a773830addd55554cce75),636 St Kilda Road,Level 3,Melbourne,Peter,Ferguson,Australia,117300.0,"Australian Collectors, Co.",114,03 9520 4555,3004,1611,Victoria
List(639a773830addd55554cce76),"67, rue des Cinquante Otages",,Nantes,Janine,Labrune,France,118200.0,La Rochelle Gifts,119,40.67.8555,44000,1370,
List(639a773830addd55554cce77),Erling Skakkes gate 78,,Stavern,Jonas,Bergulfsen,Norway,81700.0,Baane Mini Imports,121,07-98 9555,4110,1504,


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [None]:
payments_csv = f"{batch_dir}/payments.csv"

df_payments = spark.read.format('csv').options(header='true', inferSchema='true').load(payments_csv)
display(df_payments)

customerNumber,checkNumber,paymentDate,amount
103,HQ336336,2004-10-19,6066.78
103,JM555205,2003-06-05,14571.44
103,OM314933,2004-12-18,1676.14
112,BO864823,2004-12-17,14191.12
112,HQ55022,2003-06-06,32641.98
112,ND748579,2004-08-20,33347.88
114,GG31455,2003-05-20,45864.03
114,MA765515,2004-12-15,82261.22
114,NP603840,2003-05-31,7565.08
114,NR27552,2004-03-10,44894.74


In [None]:
df_payments.printSchema()

In [None]:
df_payments.write.format("delta").mode("overwrite").saveAsTable("classicmodels.dim_payments")

In [None]:
%sql
DESCRIBE EXTENDED classicmodels.dim_payments;

col_name,data_type,comment
customerNumber,int,
checkNumber,string,
paymentDate,string,
amount,double,
,,
# Partitioning,,
Not partitioned,,
,,
# Detailed Table Information,,
Name,classicmodels.dim_payments,


In [None]:
%sql
SELECT * FROM classicmodels.dim_payments LIMIT 5;

customerNumber,checkNumber,paymentDate,amount
103,HQ336336,2004-10-19,6066.78
103,JM555205,2003-06-05,14571.44
103,OM314933,2004-12-18,1676.14
112,BO864823,2004-12-17,14191.12
112,HQ55022,2003-06-06,32641.98


##### Verify Dimension Tables

In [None]:
%sql
USE classicmodels;
SHOW TABLES

database,tableName,isTemporary
classicmodels,dim_customer,False
classicmodels,dim_date,False
classicmodels,dim_payments,False
classicmodels,dim_products,False
,display_query_1,True
,display_query_2,True
,display_query_3,True
,display_query_4,True
,display_query_5,True
,display_query_6,True


### Create order details table

In [None]:
%sql

CREATE OR REPLACE TEMPORARY VIEW view_orderdetails
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds-2002-final-2.mysql.database.azure.com:3306/classicmodels",
  dbtable "classicmodels.orderdetails",
  user "msqldatabricks",
  password "Lyella227"
)

In [None]:
%sql
SELECT * FROM view_orderdetails LIMIT 5

orderNumber,productCode,quantityOrdered,priceEach,orderLineNumber
10100,S18_1749,30,136.0,3
10100,S18_2248,50,55.09,2
10100,S18_4409,22,75.46,4
10100,S24_3969,49,35.29,1
10101,S18_2325,25,108.06,4


In [None]:
%sql
USE classicmodels;

CREATE TABLE IF NOT EXISTS classicmodels.order_details
COMMENT "Order Details Table"
LOCATION "dbfs:/FileStore/ds3002-capstone/classicmodels/orderdetails"
AS SELECT * FROM view_orderdetails

num_affected_rows,num_inserted_rows


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [None]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "orderNumber LONG")
 .option("cloudFiles.schemaHints", "orderDate STRING")
 .option("cloudFiles.schemaHints", "requiredDate STRING") 
 .option("cloudFiles.schemaHints", "shippedDate STRING")
 .option("cloudFiles.schemaHints", "status STRING")
 .option("cloudFiles.schemaHints", "comments STRING")
 .option("cloudFiles.schemaHints", "customerNumber BIGINT")
 .option("cloudFiles.schemaLocation", output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [None]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [None]:
%sql
SELECT * FROM orders_bronze_tempview

comments,customerNumber,orderDate,orderNumber,requiredDate,shippedDate,status,_rescued_data,receipt_time,source_file
,363,2003-01-06,10100,2003-01-13,2003-01-10,Shipped,,2022-12-15T01:24:21.842+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
Check on availability.,128,2003-01-09,10101,2003-01-18,2003-01-11,Shipped,,2022-12-15T01:24:21.842+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
,181,2003-01-10,10102,2003-01-18,2003-01-14,Shipped,,2022-12-15T01:24:21.842+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
,121,2003-01-29,10103,2003-02-07,2003-02-02,Shipped,,2022-12-15T01:24:21.842+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
,141,2003-01-31,10104,2003-02-09,2003-02-01,Shipped,,2022-12-15T01:24:21.842+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
,145,2003-02-11,10105,2003-02-21,2003-02-12,Shipped,,2022-12-15T01:24:21.842+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
,278,2003-02-17,10106,2003-02-24,2003-02-21,Shipped,,2022-12-15T01:24:21.842+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
Difficult to negotiate with customer. We need more marketing materials,131,2003-02-24,10107,2003-03-03,2003-02-26,Shipped,,2022-12-15T01:24:21.842+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
,385,2003-03-03,10108,2003-03-12,2003-03-08,Shipped,,2022-12-15T01:24:21.842+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
Customer requested that FedEx Ground is used for this shipping,486,2003-03-10,10109,2003-03-19,2003-03-11,Shipped,,2022-12-15T01:24:21.842+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json


In [None]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

##### 6.2. Silver Table: Include Reference Data

In [None]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [None]:
%sql
SELECT * FROM orders_silver_tempview

comments,customerNumber,orderDate,orderNumber,requiredDate,shippedDate,status,_rescued_data,receipt_time,source_file
,363,2003-01-06,10100,2003-01-13,2003-01-10,Shipped,,2022-12-15T01:24:24.369+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
Check on availability.,128,2003-01-09,10101,2003-01-18,2003-01-11,Shipped,,2022-12-15T01:24:24.369+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
,181,2003-01-10,10102,2003-01-18,2003-01-14,Shipped,,2022-12-15T01:24:24.369+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
,121,2003-01-29,10103,2003-02-07,2003-02-02,Shipped,,2022-12-15T01:24:24.369+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
,141,2003-01-31,10104,2003-02-09,2003-02-01,Shipped,,2022-12-15T01:24:24.369+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
,145,2003-02-11,10105,2003-02-21,2003-02-12,Shipped,,2022-12-15T01:24:24.369+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
,278,2003-02-17,10106,2003-02-24,2003-02-21,Shipped,,2022-12-15T01:24:24.369+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
Difficult to negotiate with customer. We need more marketing materials,131,2003-02-24,10107,2003-03-03,2003-02-26,Shipped,,2022-12-15T01:24:24.369+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
,385,2003-03-03,10108,2003-03-12,2003-03-08,Shipped,,2022-12-15T01:24:24.369+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json
Customer requested that FedEx Ground is used for this shipping,486,2003-03-10,10109,2003-03-19,2003-03-11,Shipped,,2022-12-15T01:24:24.369+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/orders.json


In [None]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
comments,string,
customerNumber,bigint,
orderDate,string,
orderNumber,bigint,
requiredDate,string,
shippedDate,string,
status,string,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT t.orderNumber
    , t.requiredDate
    , t.shippedDate
    , t.status
    , CAST(od.priceEach AS INT)
    , CAST(pa.amount as INT)
    
  FROM orders_silver_tempview t
  INNER JOIN classicmodels.order_details od
  ON t.orderNumber = od.orderNumber
  
  INNER JOIN classicmodels.dim_customer c
  ON t.customerNumber = CAST(c.customerNumber AS BIGINT)
  
  INNER JOIN classicmodels.dim_payments pa
  ON t.customerNumber = CAST(pa.customerNumber AS BIGINT)

  INNER JOIN classicmodels.dim_date da
  ON CAST(t.OrderDate AS DATE) = da.full_date)

In [None]:
%sql
DESCRIBE EXTENDED fact_orders_silver_tempview

col_name,data_type,comment
orderNumber,bigint,
requiredDate,string,
shippedDate,string,
status,string,
priceEach,int,
amount,int,


In [None]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

In [None]:
### dbutils.fs.rm(f"dbfs:/FileStore/ds3002-capstone/classicmodels/fact_orders_silver", True)
# I included this because my fact orders table would create duplicate entries when I ran the code multiple times. You could also drop the table manually.
# You would then have to run Command 55 again to re-create the fact_orders_silver table.
# I'm not sure if the duplication affected the visualizations or if it was a problem with my join statements, but I couldn't figure out how to 
# fully avoid duplication.

In [None]:
%sql
SELECT * FROM fact_orders_silver

orderNumber,requiredDate,shippedDate,status,priceEach,amount
10100,2003-01-13,2003-01-10,Shipped,136,55425
10100,2003-01-13,2003-01-10,Shipped,136,10223
10100,2003-01-13,2003-01-10,Shipped,136,50799
10100,2003-01-13,2003-01-10,Shipped,55,55425
10100,2003-01-13,2003-01-10,Shipped,55,10223
10100,2003-01-13,2003-01-10,Shipped,55,50799
10100,2003-01-13,2003-01-10,Shipped,75,55425
10100,2003-01-13,2003-01-10,Shipped,75,10223
10100,2003-01-13,2003-01-10,Shipped,75,50799
10100,2003-01-13,2003-01-10,Shipped,35,55425


In [None]:
%sql
DESCRIBE EXTENDED classicmodels.fact_orders_silver

col_name,data_type,comment
orderNumber,bigint,
requiredDate,string,
shippedDate,string,
status,string,
priceEach,int,
amount,int,
,,
# Partitioning,,
Not partitioned,,
,,


##### 6.4. Gold Table: Perform Aggregations

In [None]:
%sql
SELECT orderNumber AS quantity_ordered_price_100_or_greater 
FROM classicmodels.fact_orders_silver
WHERE priceEach > 100

quantity_ordered_price_100_or_greater
55425
10223
50799
7466
33820
24101
10549
7466
33820
24101


Output can only be rendered in Databricks

In [None]:
%sql
SELECT status, count(orderNumber) AS ProductCount
FROM classicmodels.fact_orders_silver
GROUP BY status

status,ProductCount
Shipped,10965
On Hold,76
Cancelled,357
Resolved,329
In Process,188
Disputed,100


Output can only be rendered in Databricks