## Project 2: Capstone 

### Section I: Prerequisites

#### 1.0. Import Required Libraries


In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "gex7ys-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "classicmodels_dw"

connection_properties = {
  "user" : "gdgurrola",
  "password" : "Kareldotmove22!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "ds2002cluster.kbzllxx" #"cluster_name.xxxxx"
atlas_database_name = "classicmodels_dw"
atlas_user_name = "gex7ys"
atlas_password = "Kareldotmove22!"

# Data Files (JSON) Information ###############################
dst_database = "classicmodels_dlh"

base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch "
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"
purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
inventory_trans_stream_dir = f"{stream_dir}/inventory_transactions"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS classicmodels_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS classicmodels_dlh
COMMENT "DS-2002 Project 2 Database"
LOCATION "dbfs:/FileStore/lab_data/classicmodels_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Project 2");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://gex7ys-mysql.mysql.database.azure.com:3306/classicmodels_dw", --Replace with your Server Name
  dbtable "dim_date",
  user "gdgurrola",    --Replace with your User Name
  password "Kareldotmove22!"  --Replace with you password
)

In [0]:
%sql
USE DATABASE classicmodels_dlh;

CREATE OR REPLACE TABLE classicmodels_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/classicmodels_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,int,
day_name_of_week,varchar(10),
day_of_month,int,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql

CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://gex7ys-mysql.mysql.database.azure.com:3306/classicmodels_dw", --Replace with your Server Name
  dbtable "dim_customers",
  user "gdgurrola",    --Replace with your User Name
  password "Kareldotmove22!"  --Replace with you password
)

In [0]:
%sql
USE DATABASE classicmodels_dlh;

CREATE OR REPLACE TABLE classicmodels_dlh.dim_customer
COMMENT "Customer Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/classicmodels_dlh/dim_customer"
AS SELECT * FROM view_customer

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_customer;

col_name,data_type,comment
customer_key,bigint,
customerNumber,bigint,
customerName,varchar(65535),
contactLastName,varchar(65535),
contactFirstName,varchar(65535),
addressLine1,varchar(65535),
city,varchar(65535),
postalCode,varchar(65535),
country,varchar(65535),
salesRepEmployeeNumber,double,


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_customer LIMIT 5

customer_key,customerNumber,customerName,contactLastName,contactFirstName,addressLine1,city,postalCode,country,salesRepEmployeeNumber,creditLimit
1,103,Atelier graphique,Schmitt,Carine,"54, rue Royale",Nantes,44000,France,1370.0,21000.0
2,112,Signal Gift Stores,King,Jean,8489 Strong St.,Las Vegas,83030,USA,1166.0,71800.0
3,114,"Australian Collectors, Co.",Ferguson,Peter,636 St Kilda Road,Melbourne,3004,Australia,1611.0,117300.0
4,119,La Rochelle Gifts,Labrune,Janine,"67, rue des Cinquante Otages",Nantes,44000,France,1370.0,118200.0
5,121,Baane Mini Imports,Bergulfsen,Jonas,Erling Skakkes gate 78,Stavern,4110,Norway,1504.0,81700.0


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls("dbfs:/FileStore/lab_data/retail/batch"))

path,name,size,modificationTime
dbfs:/FileStore/lab_data/retail/batch/classicmodels_employees.json,classicmodels_employees.json,5206,1714874565000
dbfs:/FileStore/lab_data/retail/batch/classicmodels_orders.json,classicmodels_orders.json,70595,1714874565000
dbfs:/FileStore/lab_data/retail/batch/classicmodels_payments.csv,classicmodels_payments.csv,8994,1714874565000
dbfs:/FileStore/lab_data/retail/batch/classicmodels_payments.json,classicmodels_payments.json,8994,1714874565000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = "/dbfs/FileStore/lab_data/retail/batch"
json_files = {"employees" : 'classicmodels_employees.json'
              , "orders" : 'classicmodels_orders.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f97e05432c0>

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "gex7ys"
val pwd = "Kareldotmove22!"
val clusterName = "ds2002cluster.kbzllxx"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_employee = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "classicmodels_dw")
.option("collection", "employees")
.option("uri", atlas_uri).load()
.select("employeeNumber","lastName","firstName","jobTitle","extension", "email" ,"reportsTo" )

In [0]:
%scala
df_employee.printSchema()

In [0]:
%scala
df_employee.write.format("delta").mode("overwrite").saveAsTable("classicmodels_dlh.dim_employee")

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_employee

col_name,data_type,comment
employeeNumber,int,
lastName,string,
firstName,string,
jobTitle,string,
extension,string,
email,string,
reportsTo,int,
,,
# Delta Statistics Columns,,
Column Names,"email, reportsTo, lastName, firstName, employeeNumber, jobTitle, extension",


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_employee LIMIT 5

employeeNumber,lastName,firstName,jobTitle,extension,email,reportsTo
1002,Murphy,Diane,President,x5800,dmurphy@classicmodelcars.com,
1056,Patterson,Mary,VP Sales,x4611,mpatterso@classicmodelcars.com,1002.0
1076,Firrelli,Jeff,VP Marketing,x9273,jfirrelli@classicmodelcars.com,1002.0
1088,Patterson,William,Sales Manager (APAC),x4871,wpatterson@classicmodelcars.com,1056.0
1102,Bondur,Gerard,Sale Manager (EMEA),x5408,gbondur@classicmodelcars.com,1056.0


##### 2.4.1 Fetch Supplier Dimension Data from the New MongoDB Collection

In [0]:
%scala

val df_order = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "classicmodels_dw")
.option("collection", "orders")
.option("uri", atlas_uri).load()
.select("orderNumber","orderDate","requiredDate","shippedDate","status", "comments" ,"customerNumber" )

In [0]:
%scala
df_order.printSchema()

In [0]:
%scala
df_order.write.format("delta").mode("overwrite").saveAsTable("classicmodels_dlh.dim_orders")

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_orders

col_name,data_type,comment
orderNumber,int,
orderDate,string,
requiredDate,string,
shippedDate,string,
status,string,
comments,string,
customerNumber,int,
,,
# Delta Statistics Columns,,
Column Names,"orderNumber, orderDate, status, customerNumber, shippedDate, requiredDate, comments",


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_orders LIMIT 5

orderNumber,orderDate,requiredDate,shippedDate,status,comments,customerNumber
10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363
10101,2003-01-09,2003-01-18,2003-01-11,Shipped,Check on availability.,128
10102,2003-01-10,2003-01-18,2003-01-14,Shipped,,181
10103,2003-01-29,2003-02-07,2003-02-02,Shipped,,121
10104,2003-01-31,2003-02-09,2003-02-01,Shipped,,141


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
payments_csv = "dbfs:/FileStore/lab_data/retail/batch/classicmodels_payments.csv"

df_payments = spark.read.format('csv').options(header='true', inferSchema='true').load(payments_csv)
display(df_payments)

customerNumber,checkNumber,paymentDate,amount
103,HQ336336,2004-10-19,6066.78
103,JM555205,2003-06-05,14571.44
103,OM314933,2004-12-18,1676.14
112,BO864823,2004-12-17,14191.12
112,HQ55022,2003-06-06,32641.98
112,ND748579,2004-08-20,33347.88
114,GG31455,2003-05-20,45864.03
114,MA765515,2004-12-15,82261.22
114,NP603840,2003-05-31,7565.08
114,NR27552,2004-03-10,44894.74


In [0]:
df_payments.printSchema()

root
 |-- customerNumber: integer (nullable = true)
 |-- checkNumber: string (nullable = true)
 |-- paymentDate: date (nullable = true)
 |-- amount: double (nullable = true)



In [0]:
df_payments.write.format("delta").mode("overwrite").saveAsTable("classicmodels_dlh.dim_payments")

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_payments;

col_name,data_type,comment
customerNumber,int,
checkNumber,string,
paymentDate,date,
amount,double,
,,
# Delta Statistics Columns,,
Column Names,"customerNumber, checkNumber, paymentDate, amount",
Column Selection Method,first-32,
,,
# Detailed Table Information,,


In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_payments LIMIT 5;

customerNumber,checkNumber,paymentDate,amount
103,HQ336336,2004-10-19,6066.78
103,JM555205,2003-06-05,14571.44
103,OM314933,2004-12-18,1676.14
112,BO864823,2004-12-17,14191.12
112,HQ55022,2003-06-06,32641.98


##### Verify Dimension Tables

In [0]:
%sql
USE classicmodels_dlh;
SHOW TABLES

database,tableName,isTemporary
classicmodels_dlh,dim_customer,False
classicmodels_dlh,dim_date,False
classicmodels_dlh,dim_employee,False
classicmodels_dlh,dim_orders,False
classicmodels_dlh,dim_payments,False
,display_query_1,True
,display_query_2,True
,fact_orders,True
,fact_orders_silver_tempview,True
,orders_bronze_tempview,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

comments,customerNumber,orderDate,orderNumber,requiredDate,shippedDate,status,_rescued_data,receipt_time,source_file
,363,2003-01-06,10100,2003-01-13,2003-01-10,Shipped,,2024-05-05T04:17:23.33Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
Check on availability.,128,2003-01-09,10101,2003-01-18,2003-01-11,Shipped,,2024-05-05T04:17:23.33Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
,181,2003-01-10,10102,2003-01-18,2003-01-14,Shipped,,2024-05-05T04:17:23.33Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
,121,2003-01-29,10103,2003-02-07,2003-02-02,Shipped,,2024-05-05T04:17:23.33Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
,141,2003-01-31,10104,2003-02-09,2003-02-01,Shipped,,2024-05-05T04:17:23.33Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
,145,2003-02-11,10105,2003-02-21,2003-02-12,Shipped,,2024-05-05T04:17:23.33Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
,278,2003-02-17,10106,2003-02-24,2003-02-21,Shipped,,2024-05-05T04:17:23.33Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
Difficult to negotiate with customer. We need more marketing materials,131,2003-02-24,10107,2003-03-03,2003-02-26,Shipped,,2024-05-05T04:17:23.33Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
,385,2003-03-03,10108,2003-03-12,2003-03-08,Shipped,,2024-05-05T04:17:23.33Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
Customer requested that FedEx Ground is used for this shipping,486,2003-03-10,10109,2003-03-19,2003-03-11,Shipped,,2024-05-05T04:17:23.33Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json


In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f97d2d2efe0>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

comments,customerNumber,orderDate,orderNumber,requiredDate,shippedDate,status,_rescued_data,receipt_time,source_file
,363,2003-01-06,10100,2003-01-13,2003-01-10,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
Check on availability.,128,2003-01-09,10101,2003-01-18,2003-01-11,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
,181,2003-01-10,10102,2003-01-18,2003-01-14,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
,121,2003-01-29,10103,2003-02-07,2003-02-02,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
,141,2003-01-31,10104,2003-02-09,2003-02-01,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
,145,2003-02-11,10105,2003-02-21,2003-02-12,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
,278,2003-02-17,10106,2003-02-24,2003-02-21,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
Difficult to negotiate with customer. We need more marketing materials,131,2003-02-24,10107,2003-03-03,2003-02-26,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
,385,2003-03-03,10108,2003-03-12,2003-03-08,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
Customer requested that FedEx Ground is used for this shipping,486,2003-03-10,10109,2003-03-19,2003-03-11,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
comments,string,
customerNumber,bigint,
orderDate,string,
orderNumber,bigint,
requiredDate,string,
shippedDate,string,
status,string,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT 
    o.orderNumber AS order_id,
    o.customerNumber AS customer_id,
    c.customer_key,
    c.customerName AS customer_name,
    c.contactLastName AS contact_last_name,
    c.contactFirstName AS contact_first_name,
    c.addressLine1 AS customer_address,
    c.city AS customer_city,
    c.postalCode AS customer_postal_code,
    c.country AS customer_country,
    c.salesRepEmployeeNumber AS sales_rep_id,
    e.employeeNumber AS employee_id,
    e.lastName AS employee_last_name,
    e.firstName AS employee_first_name,
    e.jobTitle AS employee_job_title,
    o.comments,
    o.orderDate,
    o.requiredDate,
    o.shippedDate,
    o.status,
    o._rescued_data,
    o.receipt_time,
    o.source_file
  FROM orders_silver_tempview o
  INNER JOIN dim_customer c ON o.customerNumber = c.customerNumber
  LEFT JOIN dim_employee e ON c.salesRepEmployeeNumber = e.employeeNumber
)


In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f97d2d2d240>

In [0]:
%sql
SELECT * FROM fact_orders_silver

order_id,customer_id,customer_key,customer_name,contact_last_name,contact_first_name,customer_address,customer_city,customer_postal_code,customer_country,sales_rep_id,employee_id,employee_last_name,employee_first_name,employee_job_title,comments,orderDate,requiredDate,shippedDate,status,_rescued_data,receipt_time,source_file
10345,103,1,Atelier graphique,Schmitt,Carine,"54, rue Royale",Nantes,44000,France,1370.0,1370,Hernandez,Gerard,Sales Rep,,2004-11-25,2004-12-01,2004-11-26,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
10346,112,2,Signal Gift Stores,King,Jean,8489 Strong St.,Las Vegas,83030,USA,1166.0,1166,Thompson,Leslie,Sales Rep,,2004-11-29,2004-12-05,2004-11-30,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
10347,114,3,"Australian Collectors, Co.",Ferguson,Peter,636 St Kilda Road,Melbourne,3004,Australia,1611.0,1611,Fixter,Andy,Sales Rep,Can we deliver the new Ford Mustang models by end-of-quarter?,2004-11-29,2004-12-07,2004-11-30,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
10425,119,4,La Rochelle Gifts,Labrune,Janine,"67, rue des Cinquante Otages",Nantes,44000,France,1370.0,1370,Hernandez,Gerard,Sales Rep,,2005-05-31,2005-06-07,,In Process,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
10325,121,5,Baane Mini Imports,Bergulfsen,Jonas,Erling Skakkes gate 78,Stavern,4110,Norway,1504.0,1504,Jones,Barry,Sales Rep,,2004-11-05,2004-11-13,2004-11-08,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
10421,124,6,Mini Gifts Distributors Ltd.,Nelson,Susan,5677 Strong St.,San Rafael,97562,USA,1165.0,1165,Jennings,Leslie,Sales Rep,Custom shipping instructions were sent to warehouse,2005-05-29,2005-06-06,,In Process,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
10323,128,8,"Blauer See Auto, Co.",Keitel,Roland,Lyonerstr. 34,Frankfurt,60528,Germany,1504.0,1504,Jones,Barry,Sales Rep,,2004-11-05,2004-11-12,2004-11-09,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
10333,129,9,Mini Wheels Co.,Murphy,Julie,5557 North Pendale Street,San Francisco,94217,USA,1165.0,1165,Jennings,Leslie,Sales Rep,,2004-11-18,2004-11-27,2004-11-20,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
10329,131,10,Land of Toys Inc.,Lee,Kwai,897 Long Airport Avenue,NYC,10022,USA,1323.0,1323,Vanauf,George,Sales Rep,,2004-11-15,2004-11-24,2004-11-16,Shipped,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json
10424,141,11,Euro+ Shopping Channel,Freyre,Diego,"C/ Moralzarzal, 86",Madrid,28034,Spain,1370.0,1370,Hernandez,Gerard,Sales Rep,,2005-05-31,2005-06-08,,In Process,,2024-05-05T04:17:32.605Z,dbfs:/FileStore/lab_data/retail/stream/orders/classicmodels_orders.json


In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.fact_orders_silver

col_name,data_type,comment
order_id,bigint,
customer_id,bigint,
customer_key,bigint,
customer_name,varchar(65535),
contact_last_name,varchar(65535),
contact_first_name,varchar(65535),
customer_address,varchar(65535),
customer_city,varchar(65535),
customer_postal_code,varchar(65535),
customer_country,varchar(65535),


##### 6.3. Gold Table: Perform Aggregations
Create a new Gold table using the CTAS approach.

In [0]:
%sql
CREATE OR REPLACE TABLE classicmodels_dlh.fact_monthly_orders_by_customer_gold AS (
  SELECT 
    employee_last_name AS LastName,
    employee_first_name AS FirstName,
    employee_job_title AS JobTitle,
    COUNT(order_id) AS totalOrdersHandled
  FROM classicmodels_dlh.fact_orders_silver
  WHERE employee_job_title = 'Sales Rep'
  GROUP BY LastName, FirstName, employee_job_title -- Including employee_job_title in GROUP BY
  ORDER BY TotalOrdersHandled DESC
);

SELECT * FROM classicmodels_dlh.fact_monthly_orders_by_customer_gold;


LastName,FirstName,JobTitle,totalOrdersHandled
Hernandez,Gerard,Sales Rep,43
Jennings,Leslie,Sales Rep,34
Castillo,Pamela,Sales Rep,31
Jones,Barry,Sales Rep,25
Bott,Larry,Sales Rep,22
Vanauf,George,Sales Rep,22
Bondur,Loui,Sales Rep,20
Fixter,Andy,Sales Rep,19
Marsh,Peter,Sales Rep,19
Patterson,Steve,Sales Rep,18
