## DS-2002: Capstone Project

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure SQL Server Connection Information #####################
jdbc_hostname = "dsmysqlcapstone.mysql.database.azure.com"
jdbc_port = 3306
src_database = "classic_dw"

connection_properties = {
  "user" : "rsingh",
  "password" : "?Swaran+1942",
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "classic0"
atlas_database_name = "classicmodels"
atlas_user_name = "rsingh"
atlas_password = "ZvjUdS5eKaFGiGwS"

# Data Files (JSON) Information ###############################
dst_database = "AZClassic_DW"

base_dir = "dbfs:/FileStore/rsingh-capstone"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

output_bronze = f"{database_dir}/fact_transactions/bronze"
output_silver = f"{database_dir}/fact_transactions/silver"
output_gold   = f"{database_dir}/fact_transactions/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[291]: True

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the Azure SQL database server.
# ######################################################################################################################
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure SQL Database'''
    jdbcUrl = f"jdbc:mysql://{host_name}:{port}/{db_name}"
    
    '''Invoke the spark.read.jdbc() function to query the database, and fill a Pandas DataFrame.'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)
    
    return dframe


# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.yig585i.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.yig585i.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Manipulation of Data From an Multiple Sources (Azure MySQL, MongoDB, CSV)
#### 1.0. Fetch Reference Data From an Azure SQL Database
##### 1.1. Create a Databricks Database, and then Create Workers and Date Dimension Tables that Source Data from a View in an Azure SQL database.

In [0]:
%sql
DROP DATABASE IF EXISTS AZClassic_DW CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS AZClassic_DW
COMMENT "Capstone Project Database"
LOCATION "dbfs:/FileStore/rsingh-capstone/AZClassic_DW"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone Project");

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_workers
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://dsmysqlcapstone.mysql.database.azure.com:3306/classic_dw",
  dbtable "dim_workers",
  user "rsingh",
  password "?Swaran+1942"
)

In [0]:
%sql
USE DATABASE AZClassic_DW;

CREATE TABLE IF NOT EXISTS AZClassic_DW.dim_workers
COMMENT "Workers Dimension Table"
LOCATION "dbfs:/FileStore/rsingh-capstone/AZClassic_DW/dim_workers"
AS SELECT * FROM view_workers

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM AZClassic_DW.dim_workers LIMIT 5

empID,lastName,firstName,officeID,reportsTo,jobTitle,city,addressLine1,addressLine2,state,country
1002,Murphy,Diane,1,,President,San Francisco,100 Market Street,Suite 300,CA,USA
1056,Patterson,Mary,1,1002.0,VP Sales,San Francisco,100 Market Street,Suite 300,CA,USA
1076,Firrelli,Jeff,1,1002.0,VP Marketing,San Francisco,100 Market Street,Suite 300,CA,USA
1088,Patterson,William,6,1056.0,Sales Manager (APAC),Sydney,5-11 Wentworth Avenue,Floor #2,,Australia
1102,Bondur,Gerard,4,1056.0,Sale Manager (EMEA),Paris,43 Rue Jouffroy D'abbans,,,France


In [0]:
%sql
DESCRIBE EXTENDED AZClassic_DW.dim_workers;

col_name,data_type,comment
empID,bigint,
lastName,string,
firstName,string,
officeID,string,
reportsTo,double,
jobTitle,string,
city,string,
addressLine1,string,
addressLine2,string,
state,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://dsmysqlcapstone.mysql.database.azure.com:3306/classic_dw",
  dbtable "dim_date",
  user "rsingh",
  password "?Swaran+1942"
)

In [0]:
%sql
USE DATABASE AZClassic_DW;

CREATE TABLE IF NOT EXISTS AZClassic_DW.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/rsingh-capstone/AZClassic_DW/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM AZClassic_DW.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.2. Create a Transactions Fact Table

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_transactions 
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://dsmysqlcapstone.mysql.database.azure.com:3306/classic_dw",
  dbtable "fact_transactions",
  user "rsingh",
  password "?Swaran+1942"
)

In [0]:
%sql
USE DATABASE AZClassic_DW;

CREATE TABLE IF NOT EXISTS AZClassic_DW.fact_transactions
COMMENT "Transactions Fact Table"
LOCATION "dbfs:/FileStore/rsingh-capstone/AZClassic_DW/fact_transactions"
AS SELECT * FROM view_transactions

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM AZClassic_DW.fact_transactions LIMIT 5

order_key,order_id,orderDate,requiredDate,shippedDate,status,customerID,productCode,quantityOrdered,priceEach
1,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,S18_1749,30,136.0
2,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,S18_2248,50,55.09
3,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,S18_4409,22,75.46
4,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,S24_3969,49,35.29
5,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,128,S18_2325,25,108.06


In [0]:
%sql
DESCRIBE EXTENDED AZClassic_DW.fact_transactions;

col_name,data_type,comment
order_key,bigint,
order_id,bigint,
orderDate,date,
requiredDate,date,
shippedDate,date,
status,string,
customerID,bigint,
productCode,string,
quantityOrdered,bigint,
priceEach,double,


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database 
##### 2.1. Retrieval of MongoDB Data

In [0]:
%scala
import com.mongodb.spark._

val df_customers = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "classicmodels").option("collection", "customers").load()
display(df_customers)

_id,addressLine1,addressLine2,city,contactFirstName,contactLastName,country,creditLimit,customerName,customerNumber,phone,postalCode,salesRepEmployeeNumber,state
List(63995b6185290a41676e80d9),"67, rue des Cinquante Otages",,Nantes,Janine,Labrune,France,118200.0,La Rochelle Gifts,119,40.67.8555,44000,1370,
List(63995b6185290a41676e80da),Erling Skakkes gate 78,,Stavern,Jonas,Bergulfsen,Norway,81700.0,Baane Mini Imports,121,07-98 9555,4110,1504,
List(63995b6185290a41676e80e2),Vinbæltet 34,,Kobenhavn,Jytte,Petersen,Denmark,83400.0,Danish Wholesale Imports,145,31 12 3555,1734,1401,
List(63995b6185290a41676e80f5),16780 Pompton St.,,Brickhaven,Leslie,Taylor,USA,23000.0,Auto-Moto Classics Inc.,198,6175558428,58339,1216,MA
List(63995b6185290a41676e80e0),"C/ Moralzarzal, 86",,Madrid,Diego,Freyre,Spain,227600.0,Euro+ Shopping Channel,141,(91) 555 94 44,28034,1370,
List(63995b6185290a41676e8110),39323 Spinnaker Dr.,,Cambridge,Marta,Hernandez,USA,123700.0,Marta's Replicas Co.,286,6175558555,51247,1216,MA
List(63995b6185290a41676e812a),8616 Spinnaker Dr.,,Boston,Juri,Yoshido,USA,41900.0,Gifts4AllAges.com,362,6175559555,51003,1216,MA
List(63995b6185290a41676e812b),2304 Long Airport Avenue,,Nashua,Dorothy,Young,USA,114200.0,Online Diecast Creations Co.,363,6035558647,62005,1216,NH
List(63995b6185290a41676e8131),15 McCallum Street,NatWest Center #13-03,Makati City,Arnold,Cruz,Philippines,81500.0,Cruz & Sons Co.,385,+63 2 555 3587,1227 MM,1621,
List(63995b6185290a41676e8142),1785 First Street,,New Bedford,Violeta,Benitez,USA,85800.0,FunGiftIdeas.com,462,5085552555,50553,1216,MA


In [0]:
%scala
df_customers.printSchema()

##### 2.2. Write the Customers Dataframe Into the Databricks Database

In [0]:
%scala
df_customers.write.format("delta").mode("overwrite").saveAsTable("AZClassic_DW.dim_customers")

In [0]:
%sql
SELECT * FROM AZClassic_DW.dim_customers LIMIT 5

_id,addressLine1,addressLine2,city,contactFirstName,contactLastName,country,creditLimit,customerName,customerNumber,phone,postalCode,salesRepEmployeeNumber,state
List(63995b6185290a41676e80d9),"67, rue des Cinquante Otages",,Nantes,Janine,Labrune,France,118200.0,La Rochelle Gifts,119,40.67.8555,44000,1370,
List(63995b6185290a41676e80da),Erling Skakkes gate 78,,Stavern,Jonas,Bergulfsen,Norway,81700.0,Baane Mini Imports,121,07-98 9555,4110,1504,
List(63995b6185290a41676e80e2),Vinbæltet 34,,Kobenhavn,Jytte,Petersen,Denmark,83400.0,Danish Wholesale Imports,145,31 12 3555,1734,1401,
List(63995b6185290a41676e80f5),16780 Pompton St.,,Brickhaven,Leslie,Taylor,USA,23000.0,Auto-Moto Classics Inc.,198,6175558428,58339,1216,MA
List(63995b6185290a41676e80e0),"C/ Moralzarzal, 86",,Madrid,Diego,Freyre,Spain,227600.0,Euro+ Shopping Channel,141,(91) 555 94 44,28034,1370,


In [0]:
%sql
DESCRIBE EXTENDED AZClassic_DW.dim_customers;

col_name,data_type,comment
_id,struct,
addressLine1,string,
addressLine2,string,
city,string,
contactFirstName,string,
contactLastName,string,
country,string,
creditLimit,double,
customerName,string,
customerNumber,int,


#### 3.0. Use PySpark to Read From a CSV File

In [0]:
products_csv = f"{batch_dir}/dim_products.csv"

df_products = spark.read.format('csv').options(header='true', inferSchema='true').load(products_csv)
display(df_products)

productCode,productName,productLine,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,"This replica features working kickstand, front suspension, gear-shift lever, footbrake lever, drive chain, wheels and steering. All parts are particularly delicate due to their precise scale and require special care and attention.",7933,48.81,95.7
S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,7305,98.58,214.3
S10_2016,1996 Moto Guzzi 1100i,Motorcycles,1:10,Highway 66 Mini Classics,"Official Moto Guzzi logos and insignias, saddle bags located on side of motorcycle, detailed engine, working steering, working suspension, two leather seats, luggage rack, dual exhaust pipes, small saddle bag located on handle bars, two-tone paint with chrome accents, superior die-cast detail , rotating wheels , working kick stand, diecast metal with plastic parts and baked enamel finish.",6625,68.99,118.94
S10_4698,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,1:10,Red Start Diecast,"Model features, official Harley Davidson logos and insignias, detachable rear wheelie bar, heavy diecast metal with resin parts, authentic multi-color tampo-printed graphics, separate engine drive belts, free-turning front fork, rotating tires and rear racing slick, certificate of authenticity, detailed engine, display stand",,,
,precision diecast replica,baked enamel finish,1:10 scale model,removable fender,"seat and tank cover piece for displaying the superior detail of the v-twin engine""",5582,91.02,193.66
S10_4757,1972 Alfa Romeo GTA,Classic Cars,1:10,Motor City Art Classics,Features include: Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,3252,85.68,136.0
S10_4962,1962 LanciaA Delta 16V,Classic Cars,1:10,Second Gear Diecast,Features include: Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,6791,103.42,147.74
S12_1099,1968 Ford Mustang,Classic Cars,1:12,Autoart Studio Design,"Hood, doors and trunk all open to reveal highly detailed interior features. Steering wheel actually turns the front wheels. Color dark green.",68,95.34,194.57
S12_1108,2001 Ferrari Enzo,Classic Cars,1:12,Second Gear Diecast,Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,3619,95.59,207.8
S12_1666,1958 Setra Bus,Trucks and Buses,1:12,Welly Diecast Productions,"Model features 30 windows, skylights & glare resistant glass, working steering system, original logos",1579,77.9,136.67


In [0]:
df_products.printSchema()

root
 |-- productCode: string (nullable = true)
 |-- productName: string (nullable = true)
 |-- productLine: string (nullable = true)
 |-- productScale: string (nullable = true)
 |-- productVendor: string (nullable = true)
 |-- productDescription: string (nullable = true)
 |-- quantityInStock: string (nullable = true)
 |-- buyPrice: string (nullable = true)
 |-- MSRP: string (nullable = true)



In [0]:
df_products.write.format("delta").mode("overwrite").saveAsTable("AZClassic_DW.dim_products")

In [0]:
%sql
DESCRIBE EXTENDED AZClassic_DW.dim_products;

col_name,data_type,comment
productCode,string,
productName,string,
productLine,string,
productScale,string,
productVendor,string,
productDescription,string,
quantityInStock,string,
buyPrice,string,
MSRP,string,
,,


In [0]:
%sql
SELECT * FROM AZClassic_DW.dim_products LIMIT 5;

productCode,productName,productLine,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,"This replica features working kickstand, front suspension, gear-shift lever, footbrake lever, drive chain, wheels and steering. All parts are particularly delicate due to their precise scale and require special care and attention.",7933.0,48.81,95.7
S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,Turnable front wheels; steering function; detailed interior; detailed engine; opening hood; opening trunk; opening doors; and detailed chassis.,7305.0,98.58,214.3
S10_2016,1996 Moto Guzzi 1100i,Motorcycles,1:10,Highway 66 Mini Classics,"Official Moto Guzzi logos and insignias, saddle bags located on side of motorcycle, detailed engine, working steering, working suspension, two leather seats, luggage rack, dual exhaust pipes, small saddle bag located on handle bars, two-tone paint with chrome accents, superior die-cast detail , rotating wheels , working kick stand, diecast metal with plastic parts and baked enamel finish.",6625.0,68.99,118.94
S10_4698,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,1:10,Red Start Diecast,"Model features, official Harley Davidson logos and insignias, detachable rear wheelie bar, heavy diecast metal with resin parts, authentic multi-color tampo-printed graphics, separate engine drive belts, free-turning front fork, rotating tires and rear racing slick, certificate of authenticity, detailed engine, display stand",,,
,precision diecast replica,baked enamel finish,1:10 scale model,removable fender,"seat and tank cover piece for displaying the superior detail of the v-twin engine""",5582.0,91.02,193.66


##### Verify Dimension Tables

In [0]:
%sql
USE AZClassic_DW;
SHOW TABLES

database,tableName,isTemporary
azclassic_dw,dim_customers,False
azclassic_dw,dim_date,False
azclassic_dw,dim_products,False
azclassic_dw,dim_workers,False
azclassic_dw,fact_transactions,False
,display_query_1,True
,display_query_10,True
,display_query_11,True
,display_query_12,True
,display_query_13,True


### Section III: Integrate Reference Data with Real-Time Data
#### 1.0. Use AutoLoader to Process Streaming (Hot Path) Data 
##### 1.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "order_key INT")
 .option("cloudFiles.schemaHints", "quantityOrdered INT")
 .option("cloudFiles.schemaHints", "orderDate TIMESTAMP")
 .option("cloudFiles.schemaHints", "requiredDate TIMESTAMP")
 .option("cloudFiles.schemaHints", "shippedDate TIMESTAMP")
 .option("cloudFiles.schemaHints", "status STRING")
 .option("cloudFiles.schemaLocation", output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("transactions_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW transactions_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM transactions_raw_tempview
)

In [0]:
%sql
SELECT * FROM transactions_bronze_tempview

customerID,orderDate,order_id,order_key,priceEach,productCode,quantityOrdered,requiredDate,shippedDate,status,_rescued_data,receipt_time,source_file
141,2004-06-24,10262,1499,41.71,S72_1253,21,2004-07-01,,Cancelled,,2022-12-15T01:47:06.581+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data3.json
175,2004-06-28,10263,1500,89.0,S10_1678,34,2004-07-04,2004-07-02,Shipped,,2022-12-15T01:47:06.581+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data3.json
175,2004-06-28,10263,1501,107.05,S10_2016,40,2004-07-04,2004-07-02,Shipped,,2022-12-15T01:47:06.581+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data3.json
175,2004-06-28,10263,1502,193.66,S10_4698,41,2004-07-04,2004-07-02,Shipped,,2022-12-15T01:47:06.581+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data3.json
175,2004-06-28,10263,1503,123.51,S12_2823,48,2004-07-04,2004-07-02,Shipped,,2022-12-15T01:47:06.581+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data3.json
175,2004-06-28,10263,1504,67.58,S18_2581,33,2004-07-04,2004-07-02,Shipped,,2022-12-15T01:47:06.581+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data3.json
175,2004-06-28,10263,1505,50.27,S18_2625,34,2004-07-04,2004-07-02,Shipped,,2022-12-15T01:47:06.581+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data3.json
175,2004-06-28,10263,1506,109.32,S24_1578,42,2004-07-04,2004-07-02,Shipped,,2022-12-15T01:47:06.581+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data3.json
175,2004-06-28,10263,1507,67.03,S24_2000,37,2004-07-04,2004-07-02,Shipped,,2022-12-15T01:47:06.581+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data3.json
175,2004-06-28,10263,1508,59.41,S24_4278,24,2004-07-04,2004-07-02,Shipped,,2022-12-15T01:47:06.581+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data3.json


In [0]:
(spark.table("transactions_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_transactions_bronze"))

Out[326]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f1e2ec12970>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_transactions_bronze")
  .createOrReplaceTempView("transactions_silver_tempview"))

In [0]:
%sql
SELECT * FROM transactions_silver_tempview

customerID,orderDate,order_id,order_key,priceEach,productCode,quantityOrdered,requiredDate,shippedDate,status,_rescued_data,receipt_time,source_file
363,2003-01-06,10100,1,136.0,S18_1749,30,2003-01-13,2003-01-10,Shipped,,2022-12-15T01:47:21.168+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data1.json
363,2003-01-06,10100,2,55.09,S18_2248,50,2003-01-13,2003-01-10,Shipped,,2022-12-15T01:47:21.168+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data1.json
363,2003-01-06,10100,3,75.46,S18_4409,22,2003-01-13,2003-01-10,Shipped,,2022-12-15T01:47:21.168+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data1.json
363,2003-01-06,10100,4,35.29,S24_3969,49,2003-01-13,2003-01-10,Shipped,,2022-12-15T01:47:21.168+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data1.json
128,2003-01-09,10101,5,108.06,S18_2325,25,2003-01-18,2003-01-11,Shipped,,2022-12-15T01:47:21.168+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data1.json
128,2003-01-09,10101,6,167.06,S18_2795,26,2003-01-18,2003-01-11,Shipped,,2022-12-15T01:47:21.168+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data1.json
128,2003-01-09,10101,7,32.53,S24_1937,45,2003-01-18,2003-01-11,Shipped,,2022-12-15T01:47:21.168+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data1.json
128,2003-01-09,10101,8,44.35,S24_2022,46,2003-01-18,2003-01-11,Shipped,,2022-12-15T01:47:21.168+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data1.json
181,2003-01-10,10102,9,95.55,S18_1342,39,2003-01-18,2003-01-14,Shipped,,2022-12-15T01:47:21.168+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data1.json
181,2003-01-10,10102,10,43.13,S18_1367,41,2003-01-18,2003-01-14,Shipped,,2022-12-15T01:47:21.168+0000,dbfs:/FileStore/rsingh-capstone/source_data/stream/fact_data1.json


In [0]:
%sql
DESCRIBE EXTENDED transactions_silver_tempview

col_name,data_type,comment
customerID,bigint,
orderDate,string,
order_id,bigint,
order_key,bigint,
priceEach,double,
productCode,string,
quantityOrdered,bigint,
requiredDate,string,
shippedDate,string,
status,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_transactions_silver_tempview AS (
  SELECT t.order_key
  , c.customerName
  , t.customerID
  , t.quantityOrdered
  FROM transactions_silver_tempview t 
  INNER JOIN AZClassic_DW.dim_customers c 
  ON t.customerID = c.customerNumber)

In [0]:
(spark.table("fact_transactions_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_transactions_silver"))

Out[331]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f1e2ec0c940>

In [0]:
%sql
SELECT * FROM fact_transactions_silver 

order_key,customerName,customerID,quantityOrdered
1,Online Diecast Creations Co.,363,30
2,Online Diecast Creations Co.,363,50
3,Online Diecast Creations Co.,363,22
4,Online Diecast Creations Co.,363,49
5,"Blauer See Auto, Co.",128,25
6,"Blauer See Auto, Co.",128,26
7,"Blauer See Auto, Co.",128,45
8,"Blauer See Auto, Co.",128,46
9,Vitachrome Inc.,181,39
10,Vitachrome Inc.,181,41


Output can only be rendered in Databricks

In [0]:
%sql
DESCRIBE EXTENDED AZClassic_DW.fact_transactions_silver

col_name,data_type,comment
order_key,bigint,
customerName,string,
customerID,bigint,
quantityOrdered,bigint,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,AZClassic_DW,
Table,fact_transactions_silver,
Type,MANAGED,


Gold Table: Perform Aggregations

In [0]:
%sql
SELECT customerName AS CustomerName
  , COUNT(quantityOrdered) AS ProductsOrdered
FROM AZClassic_DW.fact_transactions_silver
GROUP BY customerName
ORDER BY ProductsOrdered DESC

CustomerName,ProductsOrdered
Euro+ Shopping Channel,129
Mini Gifts Distributors Ltd.,112
"AV Stores, Co.",51
Land of Toys Inc.,49
Rovelli Gifts,48
"Australian Collectors, Co.",43
"Dragon Souveniers, Ltd.",42
"Saveley & Henriot, Co.",41
Muscle Machine Inc,38
Online Diecast Creations Co.,34
