### Capstone Project
      By: Erin Moulton
#### Import Required Libraries 

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "jmq3jk-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "classicmodels_dw2"

connection_properties = {
  "user" : "emoulton",
  "password" : "Passw0rd",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "Cluster0.wfbnlq4" 
atlas_database_name = "classicmodels_dw2"
atlas_user_name = "emoulton_admin"
atlas_password = "Passw0rd"

# Data Files (JSON) Information ###############################
dst_database = "classicmodels_dlh"

base_dir = "dbfs:/FileStore/project_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### Define Global Functions

In [0]:

def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### Fetch Reference Data From an Azure MySQL Database
##### 

In [0]:
%sql
DROP DATABASE IF EXISTS classicmodels_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS classicmodels_dlh
COMMENT "DS-2002 Capstone Project Database"
LOCATION "dbfs:/FileStore/project_data/classicmodels_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Project");

##### New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS ( 
  url "jdbc:mysql://jmq3jk-mysql.mysql.database.azure.com:3306/classicmodels_dw2", 
  dbtable "dim_date", 
  user "emoulton",    
  password "Passw0rd"  
)

In [0]:
%sql
USE DATABASE classicmodels_dlh;

CREATE OR REPLACE TABLE classicmodels_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/project_data/classicmodels_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,int,
day_name_of_week,varchar(10),
day_of_month,int,
day_of_year,int,
weekday_weekend,varchar(10),


Select statement showing the dim_date dimension table populated through Azure MySQL:

In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20030101,2003-01-01,2003/01/01,01/01/2003,01/01/2003,4,Wednesday,1,1,Weekday,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030102,2003-01-02,2003/01/02,01/02/2003,02/01/2003,5,Thursday,2,2,Weekday,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030103,2003-01-03,2003/01/03,01/03/2003,03/01/2003,6,Friday,3,3,Weekday,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030104,2003-01-04,2003/01/04,01/04/2003,04/01/2003,7,Saturday,4,4,Weekend,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3
20030105,2003-01-05,2003/01/05,01/05/2003,05/01/2003,1,Sunday,5,5,Weekend,1,January,1,N,1,2003,2003-01,2003Q1,7,3,2003,2003-07,2003Q3


##### Creating a New  Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS ( 
  url "jdbc:mysql://jmq3jk-mysql.mysql.database.azure.com:3306/classicmodels_dw2",
  dbtable "dim_products", 
  user "emoulton",    
  password "Passw0rd"  
)

In [0]:
%sql
USE DATABASE classicmodels_dlh;

-- Create a new table named "classicmodels_dlh.dim_product" using data from the view named "view_product"
CREATE OR REPLACE TABLE classicmodels_dlh.dim_products 
COMMENT "Product Dimension Table"
LOCATION "dbfs:/FileStore/project_data/classicmodels_dlh/dim_products"
AS SELECT * FROM view_product

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_products;

col_name,data_type,comment
product_key,bigint,
product_id,varchar(65535),
productName,varchar(65535),
productLine,varchar(65535),
productVendor,varchar(65535),
quantityInStock,bigint,
buyPrice,double,
MSRP,double,
,,
# Delta Statistics Columns,,


Select statement showing the product dimension table populated through Azure MySQL:

In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_products LIMIT 5

product_key,product_id,productName,productLine,productVendor,quantityInStock,buyPrice,MSRP
1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,Min Lin Diecast,7933,48.81,95.7
2,S10_1949,1952 Alpine Renault 1300,Classic Cars,Classic Metal Creations,7305,98.58,214.3
3,S10_2016,1996 Moto Guzzi 1100i,Motorcycles,Highway 66 Mini Classics,6625,68.99,118.94
4,S10_4698,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,Red Start Diecast,5582,91.02,193.66
5,S10_4757,1972 Alfa Romeo GTA,Classic Cars,Motor City Art Classics,3252,85.68,136.0


#### Fetch Reference Data from a MongoDB Atlas Database to create a New Customer Dimension Table (json file)

In [0]:
display(dbutils.fs.ls(batch_dir)) 

path,name,size,modificationTime
dbfs:/FileStore/project_data/retail/batch/Classicmodels_DimCustomers.json,Classicmodels_DimCustomers.json,43001,1715023057000
dbfs:/FileStore/project_data/retail/batch/Classicmodels_DimProductlines.csv,Classicmodels_DimProductlines.csv,128,1715023057000
dbfs:/FileStore/project_data/retail/batch/classicmodels_products.json,classicmodels_products.json,50282,1715023058000


In [0]:
source_dir = '/dbfs/FileStore/project_data/retail/batch'
json_files = {"customers" : 'Classicmodels_DimCustomers.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f1894188280>

In [0]:
%scala
import com.mongodb.spark._

val userName = "emoulton_admin"
val pwd = "Passw0rd"
val clusterName = "Cluster0.wfbnlq4"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "classicmodels_dw2")
.option("collection", "customers").load()
.select("customer_key","customerName","contactLastName","contactFirstName","addressLine1","city","state","postalcode","country","salesRepEmployeeNumber","creditLimit")

display(df_customer)

customer_key,customerName,contactLastName,contactFirstName,addressLine1,city,state,postalcode,country,salesRepEmployeeNumber,creditLimit
1,Atelier graphique,Schmitt,Carine,"54, rue Royale",Nantes,,44000,France,1370.0,21000
2,Signal Gift Stores,King,Jean,8489 Strong St.,Las Vegas,NV,83030,USA,1166.0,71800
3,"Australian Collectors, Co.",Ferguson,Peter,636 St Kilda Road,Melbourne,Victoria,3004,Australia,1611.0,117300
4,La Rochelle Gifts,Labrune,Janine,"67, rue des Cinquante Otages",Nantes,,44000,France,1370.0,118200
5,Baane Mini Imports,Bergulfsen,Jonas,Erling Skakkes gate 78,Stavern,,4110,Norway,1504.0,81700
6,Mini Gifts Distributors Ltd.,Nelson,Susan,5677 Strong St.,San Rafael,CA,97562,USA,1165.0,210500
7,Havel & Zbyszek Co,Piestrzeniewicz,Zbyszek,ul. Filtrowa 68,Warszawa,,01-012,Poland,,0
8,"Blauer See Auto, Co.",Keitel,Roland,Lyonerstr. 34,Frankfurt,,60528,Germany,1504.0,59700
9,Mini Wheels Co.,Murphy,Julie,5557 North Pendale Street,San Francisco,CA,94217,USA,1165.0,64600
10,Land of Toys Inc.,Lee,Kwai,897 Long Airport Avenue,NYC,NY,10022,USA,1323.0,114900


In [0]:
%scala
df_customer.printSchema()

##### Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("classicmodels_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_customer

col_name,data_type,comment
customer_key,int,
customerName,string,
contactLastName,string,
contactFirstName,string,
addressLine1,string,
city,string,
state,string,
postalcode,string,
country,string,
salesRepEmployeeNumber,int,


Select statement showing the customer dimension table populated through MongoDB Atlas (originally a json):

In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_customer LIMIT 5

customer_key,customerName,contactLastName,contactFirstName,addressLine1,city,state,postalcode,country,salesRepEmployeeNumber,creditLimit
1,Atelier graphique,Schmitt,Carine,"54, rue Royale",Nantes,,44000,France,1370,21000
2,Signal Gift Stores,King,Jean,8489 Strong St.,Las Vegas,NV,83030,USA,1166,71800
3,"Australian Collectors, Co.",Ferguson,Peter,636 St Kilda Road,Melbourne,Victoria,3004,Australia,1611,117300
4,La Rochelle Gifts,Labrune,Janine,"67, rue des Cinquante Otages",Nantes,,44000,France,1370,118200
5,Baane Mini Imports,Bergulfsen,Jonas,Erling Skakkes gate 78,Stavern,,4110,Norway,1504,81700


#### Fetch Data from a File System (csv) to create a new Product Lines Dimension Table 

In [0]:
productlines_csv = f"{batch_dir}/Classicmodels_DimProductlines.csv"
df_productlines = spark.read.format('csv').options(header='true', inferSchema='true').load(productlines_csv)
display(df_productlines)

productlines_key,productlines_id
1,Classic Cars
2,Motorcycles
3,Planes
4,Ships
5,Trains
6,Trucks and Buses
7,Vintage Cars


In [0]:
df_productlines.printSchema()

root
 |-- productlines_key: integer (nullable = true)
 |-- productlines_id: string (nullable = true)



In [0]:
df_productlines.write.format("delta").mode("overwrite").saveAsTable("classicmodels_dlh.dim_productlines")

In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.dim_productlines;

col_name,data_type,comment
productlines_key,int,
productlines_id,string,
,,
# Delta Statistics Columns,,
Column Names,"productlines_key, productlines_id",
Column Selection Method,first-32,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,classicmodels_dlh,


Select statement showing the product lines dimension table fetched from a csv:

In [0]:
%sql
SELECT * FROM classicmodels_dlh.dim_productlines LIMIT 5;

productlines_key,productlines_id
1,Classic Cars
2,Motorcycles
3,Planes
4,Ships
5,Trains


##### Verify Dimension Tables

In [0]:
%sql
USE classicmodels_dlh;
SHOW TABLES

database,tableName,isTemporary
classicmodels_dlh,dim_customer,False
classicmodels_dlh,dim_date,False
classicmodels_dlh,dim_productlines,False
classicmodels_dlh,dim_products,False
,display_query_1,True
,display_query_2,True
,display_query_3,True
,display_query_4,True
,display_query_5,True
,display_query_6,True


### Integrate Reference Data with Real-Time Data
##### Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

A select statement that shows the fact orders bronze table:

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

customer_key,fact_order_key,order_date_key,order_id,order_status,price_each,product_key,quantity,required_date_key,shipped_date_key,_rescued_data,receipt_time,source_file
15,999,20031022,10165,Shipped,123.51,49,28,20031031,20031226.0,,2024-05-06T19:58:35.297Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders02.json
16,1000,20030603,10127,Shipped,126.39,49,25,20030609,20030606.0,,2024-05-06T19:58:35.297Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders02.json
16,1001,20041201,10349,Shipped,140.75,49,34,20041207,20041203.0,,2024-05-06T19:58:35.297Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders02.json
74,1002,20040505,10247,Shipped,143.62,49,48,20040511,20040508.0,,2024-05-06T19:58:35.297Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders02.json
68,1003,20031114,10185,Shipped,127.82,49,39,20031121,20031120.0,,2024-05-06T19:58:35.297Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders02.json
73,1004,20030925,10152,Shipped,117.77,49,35,20031003,20031001.0,,2024-05-06T19:58:35.297Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders02.json
43,1005,20041022,10314,Shipped,129.26,49,29,20041101,20041023.0,,2024-05-06T19:58:35.297Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders02.json
26,1006,20050505,10413,Shipped,133.57,49,49,20050514,20050509.0,,2024-05-06T19:58:35.297Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders02.json
93,1007,20031106,10176,Shipped,140.75,49,36,20031115,20031112.0,,2024-05-06T19:58:35.297Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders02.json
105,1008,20031126,10196,Shipped,126.39,49,27,20031203,20031201.0,,2024-05-06T19:58:35.297Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders02.json


In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f186882dab0>

##### Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

customer_key,fact_order_key,order_date_key,order_id,order_status,price_each,product_key,quantity,required_date_key,shipped_date_key,_rescued_data,receipt_time,source_file
86,1,20030106,10100,Shipped,136.0,23,30,20030113,20030110.0,,2024-05-06T20:00:17.803Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders01.json
11,2,20050210,10379,Shipped,156.4,23,39,20050218,20050211.0,,2024-05-06T20:00:17.803Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders01.json
57,3,20031105,10173,Shipped,168.3,23,24,20031115,20031109.0,,2024-05-06T20:00:17.803Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders01.json
118,4,20041117,10331,Shipped,154.7,23,44,20041123,20041123.0,,2024-05-06T20:00:17.803Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders01.json
30,5,20030318,10110,Shipped,153.0,23,42,20030324,20030320.0,,2024-05-06T20:00:17.803Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders01.json
6,6,20031112,10182,Shipped,159.8,23,44,20031121,20031118.0,,2024-05-06T20:00:17.803Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders01.json
6,7,20041021,10312,Shipped,146.2,23,48,20041027,20041023.0,,2024-05-06T20:00:17.803Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders01.json
80,8,20041125,10344,Shipped,168.3,23,45,20041202,20041129.0,,2024-05-06T20:00:17.803Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders01.json
2,9,20030521,10124,Shipped,153.0,23,21,20030529,20030525.0,,2024-05-06T20:00:17.803Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders01.json
107,10,20040126,10214,Shipped,166.6,23,30,20040204,20040129.0,,2024-05-06T20:00:17.803Z,dbfs:/FileStore/project_data/retail/stream/orders/Classicmodels_FactOrders01.json


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
customer_key,bigint,
fact_order_key,bigint,
order_date_key,bigint,
order_id,bigint,
order_status,string,
price_each,double,
product_key,bigint,
quantity,bigint,
required_date_key,bigint,
shipped_date_key,bigint,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.fact_order_key,
      o.order_id,
      o.customer_key,
      c.contactLastName AS customer_last_name,
      c.contactFirstName AS customer_first_name,
      o.product_key,
      p.buyPrice AS product_buy_price,
      p.productLine AS product_line,
      o.order_date_key,
      od.day_name_of_week AS order_day_name_of_week,
      od.day_of_month AS order_day_of_month,
      od.weekday_weekend AS order_weekday_weekend,
      od.month_name AS order_month_name,
      od.calendar_quarter AS order_quarter,
      od.calendar_year AS order_year,
      o.shipped_date_key,
      o.required_date_key,
      sd.day_name_of_week AS shipped_day_name_of_week,
      sd.day_of_month AS shipped_day_of_month,
      sd.weekday_weekend AS shipped_weekday_weekend,
      sd.month_name AS shipped_month_name,
      sd.calendar_quarter AS shipped_calendar_quarter,
      sd.calendar_year AS shipped_calendar_year,
      o.quantity, 
      o.price_each, 
      o.order_status
  FROM orders_silver_tempview AS o
  INNER JOIN classicmodels_dlh.dim_customer AS c
  ON c.customer_key = o.customer_key
  INNER JOIN classicmodels_dlh.dim_products AS p
  ON p.product_key = o.product_key
  LEFT OUTER JOIN classicmodels_dlh.dim_date AS od
  ON od.date_key = o.order_date_key
  LEFT OUTER JOIN classicmodels_dlh.dim_date AS sd
  ON sd.date_key = o.shipped_date_key
)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f186882ca90>

A select statement that shows the relationship between "real-time" fact data and static reference data:  (fact table and dimension tables were joined for the silver table)

In [0]:
%sql
SELECT * FROM fact_orders_silver

fact_order_key,order_id,customer_key,customer_last_name,customer_first_name,product_key,product_buy_price,product_line,order_date_key,order_day_name_of_week,order_day_of_month,order_weekday_weekend,order_month_name,order_quarter,order_year,shipped_date_key,required_date_key,shipped_day_name_of_week,shipped_day_of_month,shipped_weekday_weekend,shipped_month_name,shipped_calendar_quarter,shipped_calendar_year,quantity,price_each,order_status
1,10100,86,Young,Dorothy,23,86.7,Vintage Cars,20030106,Monday,6,Weekday,January,1,2003,20030110.0,20030113,Friday,10.0,Weekday,January,1.0,2003.0,30,136.0,Shipped
2,10379,11,Freyre,Diego,23,86.7,Vintage Cars,20050210,Thursday,10,Weekday,February,1,2005,20050211.0,20050218,Friday,11.0,Weekday,February,1.0,2005.0,39,156.4,Shipped
3,10173,57,Rovelli,Giovanni,23,86.7,Vintage Cars,20031105,Wednesday,5,Weekday,November,4,2003,20031109.0,20031115,Sunday,9.0,Weekend,November,4.0,2003.0,24,168.3,Shipped
4,10331,118,Salazar,Rosa,23,86.7,Vintage Cars,20041117,Wednesday,17,Weekday,November,4,2004,20041123.0,20041123,Tuesday,23.0,Weekday,November,4.0,2004.0,44,154.7,Shipped
5,10110,30,Ashworth,Rachel,23,86.7,Vintage Cars,20030318,Tuesday,18,Weekday,March,1,2003,20030320.0,20030324,Thursday,20.0,Weekday,March,1.0,2003.0,42,153.0,Shipped
6,10182,6,Nelson,Susan,23,86.7,Vintage Cars,20031112,Wednesday,12,Weekday,November,4,2003,20031118.0,20031121,Tuesday,18.0,Weekday,November,4.0,2003.0,44,159.8,Shipped
7,10312,6,Nelson,Susan,23,86.7,Vintage Cars,20041021,Thursday,21,Weekday,October,4,2004,20041023.0,20041027,Saturday,23.0,Weekend,October,4.0,2004.0,48,146.2,Shipped
8,10344,80,Lebihan,Laurence,23,86.7,Vintage Cars,20041125,Thursday,25,Weekday,November,4,2004,20041129.0,20041202,Monday,29.0,Weekday,November,4.0,2004.0,45,168.3,Shipped
9,10124,2,King,Jean,23,86.7,Vintage Cars,20030521,Wednesday,21,Weekday,May,2,2003,20030525.0,20030529,Sunday,25.0,Weekend,May,2.0,2003.0,21,153.0,Shipped
10,10214,107,Sommer,Martín,23,86.7,Vintage Cars,20040126,Monday,26,Weekday,January,1,2004,20040129.0,20040204,Thursday,29.0,Weekday,January,1.0,2004.0,30,166.6,Shipped


In [0]:
%sql
DESCRIBE EXTENDED classicmodels_dlh.fact_orders_silver

col_name,data_type,comment
fact_order_key,bigint,
order_id,bigint,
customer_key,bigint,
customer_last_name,string,
customer_first_name,string,
product_key,bigint,
product_buy_price,double,
product_line,varchar(65535),
order_date_key,bigint,
order_day_name_of_week,varchar(10),


##### Gold Table: Perform Aggregations

The select statement that shows the new Gold table using the CTAS approach: The table includes the number of products sold per customer each Month, along with the Customers' ID, First & Last Name, and the Month in which the order was placed.

In [0]:
%sql
CREATE OR REPLACE TABLE classicmodels_dlh.fact_monthly_orders_by_customer_gold AS (
  SELECT customer_key AS CustomerID
    , customer_last_name AS LastName
    , customer_first_name AS FirstName
    , order_month_name AS OrderMonth
    , COUNT(product_key) AS ProductCount
  FROM classicmodels_dlh.fact_orders_silver
  GROUP BY CustomerID, LastName, FirstName, OrderMonth
  ORDER BY ProductCount DESC);

SELECT * FROM classicmodels_dlh.fact_monthly_orders_by_customer_gold;

CustomerID,LastName,FirstName,OrderMonth,ProductCount
11,Freyre,Diego,December,57
6,Nelson,Susan,March,42
11,Freyre,Diego,February,41
11,Freyre,Diego,May,34
86,Young,Dorothy,November,30
57,Rovelli,Giovanni,November,30
6,Nelson,Susan,August,29
11,Freyre,Diego,January,29
69,Brown,Julie,October,28
6,Nelson,Susan,July,28


In [0]:
%sql
CREATE OR REPLACE TABLE classicmodels_dlh.fact_product_orders_by_customer_gold AS (
  SELECT pc.CustomerID
    , os.customer_last_name AS CustomerName
    , os.product_key AS ProductNumber
    , pc.ProductCount
  FROM classicmodels_dlh.fact_orders_silver AS os
  INNER JOIN (
    SELECT customer_key AS CustomerID
    , COUNT(product_key) AS ProductCount
    FROM classicmodels_dlh.fact_orders_silver
    GROUP BY customer_key
  ) AS pc
  ON pc.CustomerID = os.customer_key
  ORDER BY ProductCount DESC);

SELECT * FROM classicmodels_dlh.fact_product_orders_by_customer_gold;

CustomerID,CustomerName,ProductNumber,ProductCount
11,Freyre,23,259
11,Freyre,27,259
11,Freyre,50,259
11,Freyre,50,259
11,Freyre,80,259
11,Freyre,16,259
11,Freyre,16,259
11,Freyre,18,259
11,Freyre,21,259
11,Freyre,21,259


#### Clean up the File System

In [0]:
%fs rm -r /FileStore/project_data/