#Final Project

##Import Required Libraries


In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

##2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "arv2vp-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "midterm"

connection_properties = {
  "user" : "arv2vp",
  "password" : "Was4137Jar0300Pogo!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "midterm.b84ee9q"
atlas_database_name = "midterm"
atlas_user_name = "charli"
atlas_password = "Was4137Jar0300Pogo!"

# Data Files (JSON) Information ###############################
dst_database = "midterm_dlh"

base_dir = "dbfs:/FileStore/lab-data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"
purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
inventory_trans_stream_dir = f"{stream_dir}/inventory_transactions"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"

inventory_trans_output_bronze = f"{database_dir}/fact_inventory_transactions/bronze"
inventory_trans_output_silver = f"{database_dir}/fact_inventory_transactions/silver"
inventory_trans_output_gold   = f"{database_dir}/fact_inventory_transactions/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

##Define Global Functions

In [0]:
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()

    return result

#Populate Dimensions using Cold-Path Data

In [0]:
%sql
DROP DATABASE IF EXISTS midterm_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS midterm_dlh
COMMENT "DS-2002 Midterm Database"
LOCATION "dbfs:/FileStore/lab-data/midterm_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Midterm");

#####Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://arv2vp-mysql.mysql.database.azure.com:3306/midterm", --Replace with your Server Name
  dbtable "dim_date",
  user "arv2vp",    --Replace with your User Name
  password "Was4137Jar0300Pogo!"  --Replace with you password
)

In [0]:
%sql
USE DATABASE midterm_dlh;

CREATE OR REPLACE TABLE midterm_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab-data/midterm_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED midterm_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM midterm_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_product" that extracts data from your MySQL Northwind database.
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://arv2vp-mysql.mysql.database.azure.com:3306/midterm", 
  dbtable "dim_products",
  user "arv2vp",   
  password "Was4137Jar0300Pogo!"  
)

In [0]:
%sql
-- Create a new table named "midterm_dlh.dim_product" using data from the view named "view_product"
USE DATABASE midterm_dlh;

CREATE OR REPLACE TABLE midterm_dlh.dim_product
COMMENT "Products Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/midterm_dlh/product"
AS SELECT * FROM view_product

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED midterm_dlh.dim_product;

col_name,data_type,comment
product_key,int,
ProductID,int,
ProductName,varchar(37),
Color,varchar(3),
ModelDescription,varchar(10),
FabricDescription,varchar(6),
Category,varchar(21),
Gender,varchar(19),
ProductLine,varchar(9),
Weight,int,


In [0]:
%sql
SELECT * FROM midterm_dlh.dim_product LIMIT 5

product_key,ProductID,ProductName,Color,ModelDescription,FabricDescription,Category,Gender,ProductLine,Weight,Size,PackSize,Status,PurchasePrice,product_date_key
1,1,3-182,,AT,182,Undershirts,Girls' Undershirts,Underwear,822,3,Dozen,In Production,6.6,2003-07-10
2,2,3-183,,AT,183,Undershirts,Girls' Undershirts,Underwear,620,3,Dozen,Out of Production,5.6,2003-07-10
3,3,3-184,,AT,184,Undershirts,Girls' Undershirts,Underwear,718,3,Dozen,In Production,6.3,2003-07-10
4,4,3-185,,AT,185,Undershirts,Boys' Undershirts,Underwear,820,3,Dozen,In Production,5.6,2003-07-10
5,5,3-A30N,,A,30,Briefs,Girls' Panties,Underwear,214,3,Dozen,Out of Production,3.3,2003-08-06


##2.0. Fetch Reference Data from a MongoDB Atlas Database
#####View the Data Files on the Databricks File System

#####2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection


In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/lab_data/retail/batch'

path,name,size,modificationTime
dbfs:/FileStore/lab-data/retail/batch/Midterm_DimCustomers.json,Midterm_DimCustomers.json,64943,1715306932000
dbfs:/FileStore/lab-data/retail/batch/Midterm_DimEmployees.csv,Midterm_DimEmployees.csv,177,1715358388000
dbfs:/FileStore/lab-data/retail/batch/Midterm_DimEmployees.json,Midterm_DimEmployees.json,1191,1715306932000
dbfs:/FileStore/lab-data/retail/batch/Midterm_DimProducts.json,Midterm_DimProducts.json,1710963,1715306934000
dbfs:/FileStore/lab-data/retail/batch/Midterm_DimShippingMethods.json,Midterm_DimShippingMethods.json,412,1715306933000


In [0]:
source_dir = '/dbfs/FileStore/lab-data/retail/batch'
json_files = {"customers" : 'Midterm_DimCustomers.json'
              , "employees" : 'Midterm_DimEmployees.json'
              , "products" : "Midterm_DimProducts.json"
              , "shipping methods" : 'Midterm_DimShippingMethods.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f1c60a9fb40>

#####Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "arv2vp"
val pwd = "Was4137Jar0300Pogo!"
val clusterName = "midterm.b84ee9q"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "midterm")
.option("collection", "customers").load()
.select("customer_key", "CustomerID", "CustomerName", "Region", "Country", "PriceCategory", "CustomerClass", "LeadSource", "Discontinued")

display(df_customer)

customer_key,CustomerID,CustomerName,Region,Country,PriceCategory,CustomerClass,LeadSource,Discontinued
1,1,C1,Moscow,Russian Federation,1,Large-Scale Wholesaler-1,Referral by the Central Office,0
2,2,C2,Moscow,Russian Federation,1,Large-Scale Wholesaler-1,Referral by the Central Office,0
3,3,C3,Moscow,Russian Federation,1,Large-Scale Wholesaler-1,Referral by the Central Office,1
4,4,C4,Moscow,Russian Federation,4,Large-Scale Wholesaler-1,Referral by the Central Office,1
5,5,C5,Moscow,Russian Federation,1,Large-Scale Wholesaler-1,Referral by the Central Office,0
6,6,C6,Moscow,Russian Federation,1,Large-Scale Wholesaler-1,Referral by the Central Office,0
7,7,C7,Moscow,Russian Federation,1,Consumer,Other,1
8,8,C8,Moscow,Russian Federation,5,Consumer,Other,1
9,9,C9,Moscow,Russian Federation,5,Retailer,Other,1
10,10,C10,Perm,Russian Federation,5,Retailer,Advertisement in National Wholesale Magazine,1


In [0]:
%scala
df_customer.printSchema()

#####Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_customer.write.format("delta")
  .mode("overwrite")
  .saveAsTable("midterm_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED midterm_dlh.dim_customer

col_name,data_type,comment
customer_key,int,
CustomerID,int,
CustomerName,string,
Region,string,
Country,string,
PriceCategory,int,
CustomerClass,string,
LeadSource,string,
Discontinued,int,
,,


In [0]:
%sql
SELECT * FROM midterm_dlh.dim_customer LIMIT 5

customer_key,CustomerID,CustomerName,Region,Country,PriceCategory,CustomerClass,LeadSource,Discontinued
1,1,C1,Moscow,Russian Federation,1,Large-Scale Wholesaler-1,Referral by the Central Office,0
2,2,C2,Moscow,Russian Federation,1,Large-Scale Wholesaler-1,Referral by the Central Office,0
3,3,C3,Moscow,Russian Federation,1,Large-Scale Wholesaler-1,Referral by the Central Office,1
4,4,C4,Moscow,Russian Federation,4,Large-Scale Wholesaler-1,Referral by the Central Office,1
5,5,C5,Moscow,Russian Federation,1,Large-Scale Wholesaler-1,Referral by the Central Office,0


##Fetch Employee Dimension Data from the CSV

In [0]:
employee_csv = f"dbfs:/FileStore/lab-data/retail/batch/Midterm_DimEmployees.csv"

df_employee = spark.read.format('csv').options(header='true', inferSchema='true').load(employee_csv)
display(df_employee)

employee_key,EmployeeID,EmployeeName
1,1,E1
2,2,E2
3,3,E3
4,5,E4
5,6,E5
6,7,E6
7,8,E7
8,9,E8
9,10,E9
10,11,E10


In [0]:
df_employee.printSchema()

root
 |-- employee_key: integer (nullable = true)
 |-- EmployeeID: integer (nullable = true)
 |-- EmployeeName: string (nullable = true)



In [0]:
df_employee.write.format("delta").mode("overwrite").saveAsTable("midterm_dlh.dim_employee")

In [0]:
%sql
DESCRIBE EXTENDED midterm_dlh.dim_employee

col_name,data_type,comment
employee_key,int,
EmployeeID,int,
EmployeeName,string,
,,
# Delta Statistics Columns,,
Column Names,"employee_key, EmployeeID, EmployeeName",
Column Selection Method,first-32,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM midterm_dlh.dim_employee LIMIT 5

employee_key,EmployeeID,EmployeeName
1,1,E1
2,2,E2
3,3,E3
4,5,E4
5,6,E5


##### Fetch the Shipper Method Dimension Data from teh New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_shipper_method = spark.read.format("com.mongodb.spark.sql.DefaultSource")
  .option("database", "midterm")
  .option("collection", "shipping methods")
  .option("uri", atlas_uri).load()
  .select("shipping_methods_key", "ShippingMethodID", "ShippingMethod")

display(df_shipper_method)

shipping_methods_key,ShippingMethodID,ShippingMethod
1,1,Ex Works
2,2,Door to Door Service
3,3,Container
4,4,Truck


In [0]:
%scala
df_shipper_method.printSchema()

In [0]:
%scala
df_shipper_method.write.format("delta").mode("overwrite").saveAsTable("midterm_dlh.dim_shipper_method")

In [0]:
%sql
DESCRIBE EXTENDED midterm_dlh.dim_shipper_method

col_name,data_type,comment
shipping_methods_key,int,
ShippingMethodID,int,
ShippingMethod,string,
,,
# Delta Statistics Columns,,
Column Names,"shipping_methods_key, ShippingMethodID, ShippingMethod",
Column Selection Method,first-32,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM midterm_dlh.dim_shipper_method LIMIT 5

shipping_methods_key,ShippingMethodID,ShippingMethod
1,1,Ex Works
2,2,Door to Door Service
3,3,Container
4,4,Truck


#####Verify Tables

In [0]:
%sql
USE midterm_dlh;
SHOW TABLES

database,tableName,isTemporary
midterm_dlh,dim_customer,False
midterm_dlh,dim_date,False
midterm_dlh,dim_employee,False
midterm_dlh,dim_product,False
midterm_dlh,dim_shipper_method,False
,display_query_1,True
,display_query_2,True
,inventory_transactions_raw_tempview,True
,orders_bronze_tempview,True
,orders_raw_tempview,True


##Integrate Reference Data with Real-Time Data
#####Bronze Table

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 #.option("cloudFiles.schemaHints", "order_key BIGINT")
 #.option("cloudFiles.schemaHints", "employee_key BIGINT")
 #.option("cloudFiles.schemaHints", "customer_key BIGINT") 
 #.option("cloudFiles.schemaHints", "product_key BIGINT")
 #.option("cloudFiles.schemaHints", "shipper_key DECIMAL")
 #.option("cloudFiles.schemaHints", "order_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "paid_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "shipped_date_key DECIMAL") 
 #.option("cloudFiles.schemaHints", "quantity DECIMAL")
 #.option("cloudFiles.schemaHints", "unit_price DECIMAL")
 #.option("cloudFiles.schemaHints", "discount DECIMAL")
 #.option("cloudFiles.schemaHints", "shipping_fee DECIMAL")
 #.option("cloudFiles.schemaHints", "taxes DECIMAL")
 #.option("cloudFiles.schemaHints", "tax_rate DECIMAL")
 #.option("cloudFiles.schemaHints", "payment_type STRING")
 #.option("cloudFiles.schemaHints", "order_status STRING")
 #.option("cloudFiles.schemaHints", "order_details_status STRING")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

CustomerID,EmployeeID,FreightCharge,OrderDate,OrderID,ShipDate,ShippingMethodID,_rescued_data,receipt_time,source_file
1,1,0.0,7/10/2003,2,7/10/2003,1.0,,2024-05-10T16:42:02.672Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
2,2,0.0,7/11/2003,4,7/11/2003,1.0,,2024-05-10T16:42:02.672Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
2,1,0.0,7/15/2003,5,7/15/2003,1.0,,2024-05-10T16:42:02.672Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
3,1,0.0,7/14/2003,6,7/14/2003,1.0,,2024-05-10T16:42:02.672Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
4,1,0.0,7/14/2003,7,7/15/2003,1.0,,2024-05-10T16:42:02.672Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
5,1,0.0,7/14/2003,8,7/14/2003,1.0,,2024-05-10T16:42:02.672Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
5,1,0.0,7/15/2003,9,7/15/2003,1.0,,2024-05-10T16:42:02.672Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
6,1,0.0,7/16/2003,10,7/16/2003,1.0,,2024-05-10T16:42:02.672Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
3,1,0.0,7/16/2003,11,7/16/2003,1.0,,2024-05-10T16:42:02.672Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
3,1,0.0,7/17/2003,12,7/17/2003,1.0,,2024-05-10T16:42:02.672Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json


In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f1c24815b10>

#####Silver Table

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

CustomerID,EmployeeID,FreightCharge,OrderDate,OrderID,ShipDate,ShippingMethodID,_rescued_data,receipt_time,source_file
1,1,0.0,7/10/2003,2,7/10/2003,1.0,,2024-05-10T16:42:08.517Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
2,2,0.0,7/11/2003,4,7/11/2003,1.0,,2024-05-10T16:42:08.517Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
2,1,0.0,7/15/2003,5,7/15/2003,1.0,,2024-05-10T16:42:08.517Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
3,1,0.0,7/14/2003,6,7/14/2003,1.0,,2024-05-10T16:42:08.517Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
4,1,0.0,7/14/2003,7,7/15/2003,1.0,,2024-05-10T16:42:08.517Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
5,1,0.0,7/14/2003,8,7/14/2003,1.0,,2024-05-10T16:42:08.517Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
5,1,0.0,7/15/2003,9,7/15/2003,1.0,,2024-05-10T16:42:08.517Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
6,1,0.0,7/16/2003,10,7/16/2003,1.0,,2024-05-10T16:42:08.517Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
3,1,0.0,7/16/2003,11,7/16/2003,1.0,,2024-05-10T16:42:08.517Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json
3,1,0.0,7/17/2003,12,7/17/2003,1.0,,2024-05-10T16:42:08.517Z,dbfs:/FileStore/lab-data/retail/stream/orders/orders.json


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
CustomerID,bigint,
EmployeeID,bigint,
FreightCharge,double,
OrderDate,string,
OrderID,bigint,
ShipDate,string,
ShippingMethodID,bigint,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.fact_order_key,
      o.OrderID,
      o.employee_id_key,
      e.EmployeeName,
      o.customer_id_key,
      c.CustomerName,
      o.product_id_key,
      p.ProductName,
      p.PurchasePrice,
      p.Category AS ProductCategory,
      p.ProductLine,
      o.shipping_methods_key,
      s.ShippingMethod,
      o.order_date_key,
      o.shipped_date_key,
      o.QuantitySold,
      o.UnitSalesPrice,
      o.FreightCharge
  FROM orders_silver_tempview AS o
  INNER JOIN midterm_dlh.dim_employee AS e
  ON e.EmployeeID = o.employee_id_key
  INNER JOIN midterm_dlh.dim_customer AS c
  ON c.CustomerID = o.customer_id_key
  INNER JOIN midterm_dlh.dim_product AS p
  ON p.ProductID = o.product_id_key
)

org.apache.spark.sql.catalyst.ExtendedAnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `o`.`employee_id_key` cannot be resolved. Did you mean one of the following? [`e`.`employee_key`, `o`.`EmployeeID`, `e`.`EmployeeName`, `e`.`EmployeeID`, `o`.`source_file`]. SQLSTATE: 42703; line 22 pos 20;
'CreateViewCommand `fact_orders_silver_tempview`, (
  SELECT o.fact_order_key,
      o.OrderID,
      o.employee_id_key,
      e.EmployeeName,
      o.customer_id_key,
      c.CustomerName,
      o.product_id_key,
      p.ProductName,
      p.PurchasePrice,
      p.Category AS ProductCategory,
      p.ProductLine,
      o.shipping_methods_key,
      s.ShippingMethod,
      o.order_date_key,
      o.shipped_date_key,
      o.QuantitySold,
      o.UnitSalesPrice,
      o.FreightCharge
  FROM orders_silver_tempview AS o
  INNER JOIN midterm_dlh.dim_employee AS e
  ON e.EmployeeID = o.employee_id_key
  INNER JOIN midterm_dlh.dim_customer AS c
 

In [0]:
%sql
SELECT * FROM fact_orders_silver

[0;31m---------------------------------------------------------------------------[0m
[0;31mFileNotFoundError[0m                         Traceback (most recent call last)
File [0;32m<command-1660743566228016>, line 7[0m
[1;32m      1[0m source_dir [38;5;241m=[39m [38;5;124m'[39m[38;5;124m/dbfs/FileStore/lab_data/retail/batch[39m[38;5;124m'[39m
[1;32m      2[0m json_files [38;5;241m=[39m {[38;5;124m"[39m[38;5;124mcustomers[39m[38;5;124m"[39m : [38;5;124m'[39m[38;5;124mMidterm_DimCustomers.json[39m[38;5;124m'[39m
[1;32m      3[0m               , [38;5;124m"[39m[38;5;124memployees[39m[38;5;124m"[39m : [38;5;124m'[39m[38;5;124mMidterm_DimEmployees.json[39m[38;5;124m'[39m
[1;32m      4[0m               , [38;5;124m"[39m[38;5;124mproducts[39m[38;5;124m"[39m : [38;5;124m"[39m[38;5;124mMidterm_DimProducts.json[39m[38;5;124m"[39m
[1;32m      5[0m               , [38;5;124m"[39m[38;5;124mshipping methods[39m[38;5;124m"[39m : [38

In [0]:
%sql
DESCRIBE EXTENDED midterm_dlh.fact_orders_silver

[0;31m---------------------------------------------------------------------------[0m
[0;31mFileNotFoundError[0m                         Traceback (most recent call last)
File [0;32m<command-1660743566228016>, line 7[0m
[1;32m      1[0m source_dir [38;5;241m=[39m [38;5;124m'[39m[38;5;124m/dbfs/FileStore/lab_data/retail/batch[39m[38;5;124m'[39m
[1;32m      2[0m json_files [38;5;241m=[39m {[38;5;124m"[39m[38;5;124mcustomers[39m[38;5;124m"[39m : [38;5;124m'[39m[38;5;124mMidterm_DimCustomers.json[39m[38;5;124m'[39m
[1;32m      3[0m               , [38;5;124m"[39m[38;5;124memployees[39m[38;5;124m"[39m : [38;5;124m'[39m[38;5;124mMidterm_DimEmployees.json[39m[38;5;124m'[39m
[1;32m      4[0m               , [38;5;124m"[39m[38;5;124mproducts[39m[38;5;124m"[39m : [38;5;124m"[39m[38;5;124mMidterm_DimProducts.json[39m[38;5;124m"[39m
[1;32m      5[0m               , [38;5;124m"[39m[38;5;124mshipping methods[39m[38;5;124m"[39m : [38

#####Gold Table

In [0]:
%sql
CREATE OR REPLACE TABLE midterm_dlh.fact_monthly_orders_by_customer_gold AS (
  SELECT CustomerID
    , CustomerName
    , COUNT(product_id_key) AS ProductCount
  FROM midterm_dlh.fact_orders_silver
  GROUP BY CustomerID, CustomerName
  ORDER BY ProductCount DESC);

SELECT * FROM midterm_dlh.fact_monthly_orders_by_customer_gold;

[0;31m---------------------------------------------------------------------------[0m
[0;31mFileNotFoundError[0m                         Traceback (most recent call last)
File [0;32m<command-1660743566228016>, line 7[0m
[1;32m      1[0m source_dir [38;5;241m=[39m [38;5;124m'[39m[38;5;124m/dbfs/FileStore/lab_data/retail/batch[39m[38;5;124m'[39m
[1;32m      2[0m json_files [38;5;241m=[39m {[38;5;124m"[39m[38;5;124mcustomers[39m[38;5;124m"[39m : [38;5;124m'[39m[38;5;124mMidterm_DimCustomers.json[39m[38;5;124m'[39m
[1;32m      3[0m               , [38;5;124m"[39m[38;5;124memployees[39m[38;5;124m"[39m : [38;5;124m'[39m[38;5;124mMidterm_DimEmployees.json[39m[38;5;124m'[39m
[1;32m      4[0m               , [38;5;124m"[39m[38;5;124mproducts[39m[38;5;124m"[39m : [38;5;124m"[39m[38;5;124mMidterm_DimProducts.json[39m[38;5;124m"[39m
[1;32m      5[0m               , [38;5;124m"[39m[38;5;124mshipping methods[39m[38;5;124m"[39m : [38

In [0]:
%sql
CREATE OR REPLACE TABLE midterm_dlh.fact_product_orders_by_customer_gold AS (
  SELECT pc.CustomerID
    , os.CustomerName
    , os.PoductName
    , pc.ProductCount
  FROM midterm_dlh.fact_orders_silver AS os
  INNER JOIN (
    SELECT customer_key AS CustomerID
    , COUNT(product_key) AS ProductCount
    FROM midtermdlh.fact_orders_silver
    GROUP BY customer_key
  ) AS pc
  ON pc.CustomerID = os.customer_key
  ORDER BY ProductCount DESC);

SELECT * FROM midtrem_dlh.fact_product_orders_by_customer_gold;

[0;31m---------------------------------------------------------------------------[0m
[0;31mFileNotFoundError[0m                         Traceback (most recent call last)
File [0;32m<command-1660743566228016>, line 7[0m
[1;32m      1[0m source_dir [38;5;241m=[39m [38;5;124m'[39m[38;5;124m/dbfs/FileStore/lab_data/retail/batch[39m[38;5;124m'[39m
[1;32m      2[0m json_files [38;5;241m=[39m {[38;5;124m"[39m[38;5;124mcustomers[39m[38;5;124m"[39m : [38;5;124m'[39m[38;5;124mMidterm_DimCustomers.json[39m[38;5;124m'[39m
[1;32m      3[0m               , [38;5;124m"[39m[38;5;124memployees[39m[38;5;124m"[39m : [38;5;124m'[39m[38;5;124mMidterm_DimEmployees.json[39m[38;5;124m'[39m
[1;32m      4[0m               , [38;5;124m"[39m[38;5;124mproducts[39m[38;5;124m"[39m : [38;5;124m"[39m[38;5;124mMidterm_DimProducts.json[39m[38;5;124m"[39m
[1;32m      5[0m               , [38;5;124m"[39m[38;5;124mshipping methods[39m[38;5;124m"[39m : [38

##Clean the file system

In [0]:
%fs rm -r /FileStore/lab_data/

##Select Statement

In [0]:
%sql
sql_test = """
SELECT customers.`CustomerName` AS `customer_name`,
    SUM(orders.`QuantitySold`) AS `total_quantity`,
    SUM(orders.`UnitSalesPrice`) AS `total_unit_price`
FROM `{0}`.`fact_orders` AS orders
INNER JOIN `{0}`.`dim_customers` AS customers
ON orders.customer_id_key = customers.CustomerID
GROUP BY customers.`CustomerName`
ORDER BY total_unit_price DESC;
""".format(src_dbname)

df_test = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_test)
df_test.head()

org.apache.spark.sql.catalyst.parser.ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'sql_test'. SQLSTATE: 42601 (line 1, pos 0)

== SQL ==
sql_test = """
^^^
SELECT customers.`CustomerName` AS `customer_name`,
    SUM(orders.`QuantitySold`) AS `total_quantity`,
    SUM(orders.`UnitSalesPrice`) AS `total_unit_price`
FROM `{0}`.`fact_orders` AS orders
INNER JOIN `{0}`.`dim_customers` AS customers
ON orders.customer_id_key = customers.CustomerID
GROUP BY customers.`CustomerName`
ORDER BY total_unit_price DESC;
""".format(src_dbname)

df_test = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_test)
df_test.head()

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(parsers.scala:301)
	at org.apache.spark.sql.catalyst.parser.AbstractParser.parse(parsers.scala:114)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:124)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(AbstractSqlParser.scala:87)
	at com.databr