DS-2002: Capstone Project

Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure SQL Server Connection Information #####################
jdbc_hostname = "ds2002samplesql.database.windows.net"
jdbc_port = 1433
src_database = "sakila_dw"

connection_properties = {
  "user" : "burrelllizzie",
  "password" : "178Ch@ndler",
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "SakilaBrick"
atlas_database_name = "sakila_box"
atlas_user_name = "burrelllizzie"
atlas_password = "178Chandler"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dw"

base_dir = "dbfs:/FileStore/ds2002capstone"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

output_bronze = f"{database_dir}/fact_rentals/bronze"
output_silver = f"{database_dir}/fact_rentals/silver"
output_gold   = f"{database_dir}/fact_rentals/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rentals", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the Azure SQL database server.
# ######################################################################################################################
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure SQL Database'''
     jdbcUrl = f"jdbc:mysql://{host_name}:{port}/{db_name}"

    '''Invoke the spark.read.jdbc() function to query the database, and fill a Pandas DataFrame.'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)
    
    return dframe


# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.rafft.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.zibbf.mongodb.net/{db_name}?retryWrites=true&w=majority"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure SQL Database
##### 1.1. Create a New Databricks Metadata Database, and then Create a New Table that Sources its Data from a View in an Azure SQL database.

In [0]:
%sql
DROP DATABASE IF EXISTS salika_dw CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dw
COMMENT "Capstone Project Database"
LOCATION "dbfs:/FileStore/ds2002capstone/sakila_dw"
WITH DBPROPERTIES (contains_pii = true, purpose = "ds2002 Capstone");

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_rental
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:sqlserver://ds2002sqlsample.database.windows.net:1433;database=sakila_dw",
  dbtable "Rental.vDimRental",
  user "burrelllizzie",
  password "178Ch@ndler"
)

In [0]:
%sql
USE DATABASE sakila_dw;

CREATE TABLE IF NOT EXISTS sakila_dw.dim_rental
COMMENT "Rentals Dimension Table"
LOCATION "dbfs:/FileStore/ds2002capstone/sakila_dw/dim_rental"
AS SELECT * FROM view_rental

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM adventure_works.dim_product LIMIT 5

ProductID,ProductNumber,ProductCategory,Name,ProductModelName,ProductDescription,Color,Size,Weight,StandardCost,ListPrice,SellStartDate,SellEndDate,DiscontinuedDate,rowguid,ModifiedDate
994,BB-7421,Bottom Brackets,LL Bottom Bracket,LL Bottom Bracket,Chromoly steel.,,,223.0,23.9716,53.99,2007-07-01T00:00:00.000+0000,,,FA3C65CD-0A22-47E3-BDF6-53F1DC138C43,2008-03-11T10:01:36.827+0000
995,BB-8107,Bottom Brackets,ML Bottom Bracket,ML Bottom Bracket,Aluminum alloy cups; large diameter spindle.,,,168.0,44.9506,101.24,2007-07-01T00:00:00.000+0000,,,71AB847F-D091-42D6-B735-7B0C2D82FC84,2008-03-11T10:01:36.827+0000
996,BB-9108,Bottom Brackets,HL Bottom Bracket,HL Bottom Bracket,Aluminum alloy cups and a hollow axle.,,,170.0,53.9416,121.49,2007-07-01T00:00:00.000+0000,,,230C47C5-08B2-4CE3-B706-69C0BDD62965,2008-03-11T10:01:36.827+0000
984,BK-M18S-40,Mountain Bikes,"Mountain-500 Silver, 40",Mountain-500,"Suitable for any type of riding, on or off-road. Fits any budget. Smooth-shifting with a comfortable ride.",Silver,40.0,12405.69,308.2179,564.99,2007-07-01T00:00:00.000+0000,,,B96C057B-6416-4851-8D59-BCB37C8E6E51,2008-03-11T10:01:36.827+0000
985,BK-M18S-42,Mountain Bikes,"Mountain-500 Silver, 42",Mountain-500,"Suitable for any type of riding, on or off-road. Fits any budget. Smooth-shifting with a comfortable ride.",Silver,42.0,12596.19,308.2179,564.99,2007-07-01T00:00:00.000+0000,,,B8D1B5D9-8A39-4097-A04A-56E95559B534,2008-03-11T10:01:36.827+0000


In [0]:
%sql
DESCRIBE EXTENDED sakila_dw.dim_rental;

col_name,data_type,comment
ProductID,int,
ProductNumber,string,
ProductCategory,string,
Name,string,
ProductModelName,string,
ProductDescription,string,
Color,string,
Size,string,
Weight,"decimal(8,2)",
StandardCost,"decimal(19,4)",


##### 1.2. Create a New Table that Sources its Data from a Table in an Azure SQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:sqlserver://ds2002sqlsample.database.windows.net:1433;database=sakila_dw",
  dbtable "sak.DimDate",
  user "burrelllizzie",
  password "178Ch@ndler"
)

In [0]:
%sql
USE DATABASE sakila_dw;

CREATE TABLE IF NOT EXISTS sakila_dw.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002capstone/sakila_dw/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM sakila_dw.dim_date LIMIT 5

DateKey,Date,Day,DaySuffix,Weekday,WeekDayName,WeekDayName_Short,WeekDayName_FirstLetter,DOWInMonth,DayOfYear,WeekOfMonth,WeekOfYear,Month,MonthName,MonthName_Short,MonthName_FirstLetter,Quarter,QuarterName,Year,MMYYYY,MonthYear,IsWeekend,IsHoliday,HolidayName,SpecialDays,FinancialYear,FinancialQuater,FinancialMonth,FirstDateofYear,LastDateofYear,FirstDateofQuater,LastDateofQuater,FirstDateofMonth,LastDateofMonth,FirstDateofWeek,LastDateofWeek,CurrentYear,CurrentQuater,CurrentMonth,CurrentWeek,CurrentDay
20000101,2000-01-01,1,st,7,Saturday,SAT,S,1,1,1,1,1,January,JAN,J,1,First,2000,12000,2000JAN,True,False,,,,,,2000-01-01,2000-12-31,2022-04-01,2022-06-30,2000-01-01,2000-01-31,1999-12-26,2000-01-01,-22,-89,-268,-1167,-8165
20000102,2000-01-02,2,nd,1,Sunday,SUN,S,2,2,2,2,1,January,JAN,J,1,First,2000,12000,2000JAN,True,False,,,,,,2000-01-01,2000-12-31,2022-04-01,2022-06-30,2000-01-01,2000-01-31,2000-01-02,2000-01-08,-22,-89,-268,-1166,-8164
20000103,2000-01-03,3,rd,2,Monday,MON,M,3,3,2,2,1,January,JAN,J,1,First,2000,12000,2000JAN,False,False,,,,,,2000-01-01,2000-12-31,2022-04-01,2022-06-30,2000-01-01,2000-01-31,2000-01-02,2000-01-08,-22,-89,-268,-1166,-8163
20000104,2000-01-04,4,th,3,Tuesday,TUE,T,4,4,2,2,1,January,JAN,J,1,First,2000,12000,2000JAN,False,False,,,,,,2000-01-01,2000-12-31,2022-04-01,2022-06-30,2000-01-01,2000-01-31,2000-01-02,2000-01-08,-22,-89,-268,-1166,-8162
20000105,2000-01-05,5,th,4,Wednesday,WED,W,5,5,2,2,1,January,JAN,J,1,First,2000,12000,2000JAN,False,False,,,,,,2000-01-01,2000-12-31,2022-04-01,2022-06-30,2000-01-01,2000-01-31,2000-01-02,2000-01-08,-22,-89,-268,-1166,-8161


In [0]:
%sql
DESCRIBE EXTENDED sakila_dw.dim_date;

col_name,data_type,comment
DateKey,int,
Date,date,
Day,int,
DaySuffix,string,
Weekday,int,
WeekDayName,string,
WeekDayName_Short,string,
WeekDayName_FirstLetter,string,
DOWInMonth,int,
DayOfYear,smallint,


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/ds3002-capstone/source_data/batch/AdventureWorksLT_DimAddress.csv,AdventureWorksLT_DimAddress.csv,32959,1653590846000
dbfs:/FileStore/ds3002-capstone/source_data/batch/AdventureWorksLT_DimAddress.json,AdventureWorksLT_DimAddress.json,106607,1653590846000
dbfs:/FileStore/ds3002-capstone/source_data/batch/AdventureWorksLT_DimCustomer.json,AdventureWorksLT_DimCustomer.json,328754,1653590846000
dbfs:/FileStore/ds3002-capstone/source_data/batch/AdventureWorksLT_DimProduct.json,AdventureWorksLT_DimProduct.json,196457,1653590846000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/ds2002capstone/data/batch'
json_files = {"customers" : 'sakila_DimCustomer.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

##### 2.3. Fetch Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "sakila_box").option("collection", "customers").load()
display(df_customer)

AddressLine1,AddressLine2,AddressType,City,CompanyName,CountryRegion,CustomerID,EmailAddress,FirstName,LastName,MiddleName,ModifiedDate,NameStyle,PasswordHash,PasswordSalt,Phone,PostalCode,SalesPerson,StateProvince,Suffix,Title,_id,rowguid
57251 Serene Blvd,,Main Office,Van Nuys,Professional Sales and Service,United States,29485,catherine0@adventure-works.com,Catherine,Abel,R.,2009-05-16T16:33:33.077,0,zh3goJUbYsPv92k4bVZuJtlLHwuvpQtu6uNcjkKSdF8=,rpyd5Tw=,747-555-0171,91411,adventure-works\linda3,California,,Ms.,List(63878bfdb3f58a7fbc96d6d5),392ae773-d7ec-48ac-b8d0-6e65b770285c
Tanger Factory,,Main Office,Branch,Riders Company,United States,29486,kim2@adventure-works.com,Kim,Abercrombie,,2009-05-16T16:33:33.077,0,4I8349R6c33cK+j1ef3dZt0JHOQ9MV7OvEXpCbwhQrQ=,rrgbG/U=,334-555-0137,55056,adventure-works\jillian0,Minnesota,,Ms.,List(63878bfdb3f58a7fbc96d6d6),2a5aba2e-8db0-4856-a773-21d185f1679e
6900 Sisk Road,,Main Office,Modesto,Area Bike Accessories,United States,29489,frances0@adventure-works.com,Frances,Adams,B.,2009-05-16T16:33:33.09,0,bmEI+phqLCE2jKmotM8SBAICQD2IvZEmy8X0LmUpMaw=,jA7oD80=,991-555-0183,95354,adventure-works\shu0,California,,Ms.,List(63878bfdb3f58a7fbc96d6d7),c353fe38-6147-40a3-944d-3736f6297b8c
Lewiston Mall,,Main Office,Lewiston,Bicycle Accessories and Kits,United States,29490,margaret0@adventure-works.com,Margaret,Smith,J.,2009-05-16T16:33:33.107,0,3cd7qJyW8ZTgRdwOO9nLYDZg2EM6lehJe/nqKlKKcPY=,i2U3DxA=,959-555-0151,83501,adventure-works\david8,Idaho,,Ms.,List(63878bfdb3f58a7fbc96d6d8),7f2d6183-6aee-4ad1-973d-f45a19b70bf7
Blue Ridge Mall,,Main Office,Kansas City,Valley Bicycle Specialists,United States,29492,jay1@adventure-works.com,Jay,Adams,,2009-05-16T16:33:33.123,0,jCFDuqUMHmknfadTRSkMvN0IDObtE/GslvN9q2Wa5xU=,117fxZM=,158-555-0142,64106,adventure-works\jillian0,Missouri,,Mr.,List(63878bfdb3f58a7fbc96d6d9),7a2eb695-b6b9-48d4-bad2-bd33e8ce8ca1
No. 25800-130 King Street West,,Main Office,Toronto,Vinyl and Plastic Goods Corporation,Canada,29494,samuel0@adventure-works.com,Samuel,Agcaoili,N.,2005-09-01T00:00:00,0,jt9vdIyi0zI03wECUFk1hdZLTVOqN09/Fdogi+cTeQU=,uFYBREA=,554-555-0110,M4B 1V5,adventure-works\josé1,Ontario,,Mr.,List(63878bfdb3f58a7fbc96d6da),66d2dc13-4220-43b8-b2c7-131183caec34
6500 East Grant Road,,Main Office,Tucson,Fun Toys and Bikes,United States,29496,robert1@adventure-works.com,Robert,Ahlering,E.,2007-09-01T00:00:00,0,d35zXrfhsEHK6QrH/B7ipKUuuilEpY8u8rIunip5YWI=,r/UyVHY=,678-555-0175,85701,adventure-works\shu0,Arizona,,Mr.,List(63878bfdb3f58a7fbc96d6db),44e51ec2-9d0b-40cc-b725-1a3e81df9519
Eastridge Mall,,Main Office,Casper,Great Bikes,United States,29497,françois1@adventure-works.com,François,Ferrier,,2005-07-01T00:00:00,0,Li26cq1s3a+0YJcgjemlepj98r5eUwJlyHGmDJnSCWI=,NVfuzjo=,571-555-0128,82601,adventure-works\david8,Wyoming,,Mr.,List(63878bfdb3f58a7fbc96d6dc),eeef3c65-08f9-49c6-ae12-a2fcdedacaf5
252851 Rowan Place,,Main Office,Richmond,Valley Toy Store,Canada,29499,amy1@adventure-works.com,Amy,Alberts,E.,2006-09-01T00:00:00,0,dNz/EQlgVlbj0uOpI0Y8Rh+/GFUH1HvBLJJ4pqdSDTE=,yvdbcxM=,727-555-0115,V6B 3P7,adventure-works\josé1,British Columbia,,Ms.,List(63878bfdb3f58a7fbc96d6dd),29cd70be-d5e5-4a7a-a1d5-521bcce2193d
White Mountain Mall,,Main Office,Rock Springs,Major Sport Suppliers,United States,29502,paul2@adventure-works.com,Paul,Alcorn,L.,2007-07-01T00:00:00,0,UxlXfO/0JyTpelLFzbqFj9Ie1Rv1OJKPd6Cnm1itSsU=,IZ5yIjI=,331-555-0162,82901,adventure-works\david8,Wyoming,,Mr.,List(63878bfdb3f58a7fbc96d6de),f193d485-3a5a-415a-9d3e-4a5db8b4a2a0


In [0]:
%scala
df_customer.printSchema()

##### 2.4. Use the Spark DataFrame to Create a New Table in the Databricks (Adventure Works) Metadata Database

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dw.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dw.dim_customer

col_name,data_type,comment
AddressLine1,string,
AddressLine2,string,
AddressType,string,
City,string,
CompanyName,string,
CountryRegion,string,
CustomerID,int,
EmailAddress,string,
FirstName,string,
LastName,string,


##### 2.5. Query the New Table in the Databricks Metadata Database

In [0]:
%sql
SELECT * FROM sakila_dw.dim_customer LIMIT 5

AddressLine1,AddressLine2,AddressType,City,CompanyName,CountryRegion,CustomerID,EmailAddress,FirstName,LastName,MiddleName,ModifiedDate,NameStyle,PasswordHash,PasswordSalt,Phone,PostalCode,SalesPerson,StateProvince,Suffix,Title,_id,rowguid
57251 Serene Blvd,,Main Office,Van Nuys,Professional Sales and Service,United States,29485,catherine0@adventure-works.com,Catherine,Abel,R.,2009-05-16T16:33:33.077,0,zh3goJUbYsPv92k4bVZuJtlLHwuvpQtu6uNcjkKSdF8=,rpyd5Tw=,747-555-0171,91411,adventure-works\linda3,California,,Ms.,List(63878bfdb3f58a7fbc96d6d5),392ae773-d7ec-48ac-b8d0-6e65b770285c
Tanger Factory,,Main Office,Branch,Riders Company,United States,29486,kim2@adventure-works.com,Kim,Abercrombie,,2009-05-16T16:33:33.077,0,4I8349R6c33cK+j1ef3dZt0JHOQ9MV7OvEXpCbwhQrQ=,rrgbG/U=,334-555-0137,55056,adventure-works\jillian0,Minnesota,,Ms.,List(63878bfdb3f58a7fbc96d6d6),2a5aba2e-8db0-4856-a773-21d185f1679e
6900 Sisk Road,,Main Office,Modesto,Area Bike Accessories,United States,29489,frances0@adventure-works.com,Frances,Adams,B.,2009-05-16T16:33:33.09,0,bmEI+phqLCE2jKmotM8SBAICQD2IvZEmy8X0LmUpMaw=,jA7oD80=,991-555-0183,95354,adventure-works\shu0,California,,Ms.,List(63878bfdb3f58a7fbc96d6d7),c353fe38-6147-40a3-944d-3736f6297b8c
Lewiston Mall,,Main Office,Lewiston,Bicycle Accessories and Kits,United States,29490,margaret0@adventure-works.com,Margaret,Smith,J.,2009-05-16T16:33:33.107,0,3cd7qJyW8ZTgRdwOO9nLYDZg2EM6lehJe/nqKlKKcPY=,i2U3DxA=,959-555-0151,83501,adventure-works\david8,Idaho,,Ms.,List(63878bfdb3f58a7fbc96d6d8),7f2d6183-6aee-4ad1-973d-f45a19b70bf7
Blue Ridge Mall,,Main Office,Kansas City,Valley Bicycle Specialists,United States,29492,jay1@adventure-works.com,Jay,Adams,,2009-05-16T16:33:33.123,0,jCFDuqUMHmknfadTRSkMvN0IDObtE/GslvN9q2Wa5xU=,117fxZM=,158-555-0142,64106,adventure-works\jillian0,Missouri,,Mr.,List(63878bfdb3f58a7fbc96d6d9),7a2eb695-b6b9-48d4-bad2-bd33e8ce8ca1


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
address_csv = f"{batch_dir}/sakila_dw_DimAddress.csv"

df_address = spark.read.format('csv').options(header='true', inferSchema='true').load(address_csv)
display(df_address)

AddressID,AddressLine1,AddressLine2,City,StateProvince,CountryRegion,PostalCode
532,#500-75 O'Connor Street,,Ottawa,Ontario,Canada,K4B 1S2
497,#9900 2700 Production Way,,Burnaby,British Columbia,Canada,V5A 4X1
859,1050 Oak Street,,Seattle,Washington,United States,98104
604,1200 First Ave.,,Joliet,Illinois,United States,60433
1038,123 Camelia Avenue,,Oxnard,California,United States,93030
594,123 W. Lake Ave.,,Peoria,Illinois,United States,61606
559,12345 Sterling Avenue,,Irving,Texas,United States,75061
11,1318 Lasalle Street,,Bothell,Washington,United States,98011
855,15 East Main,,Port Orchard,Washington,United States,98366
11380,165 North Main,,Austin,Texas,United States,78701


In [0]:
df_address.printSchema()

In [0]:
df_address.write.format("delta").mode("overwrite").saveAsTable("sakila_dw.dim_address")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dw.dim_address;

col_name,data_type,comment
AddressID,int,
AddressLine1,string,
AddressLine2,string,
City,string,
StateProvince,string,
CountryRegion,string,
PostalCode,string,
,,
# Partitioning,,
Not partitioned,,


In [0]:
%sql
SELECT * FROM sakila_dw.dim_address LIMIT 5;

AddressID,AddressLine1,AddressLine2,City,StateProvince,CountryRegion,PostalCode
532,#500-75 O'Connor Street,,Ottawa,Ontario,Canada,K4B 1S2
497,#9900 2700 Production Way,,Burnaby,British Columbia,Canada,V5A 4X1
859,1050 Oak Street,,Seattle,Washington,United States,98104
604,1200 First Ave.,,Joliet,Illinois,United States,60433
1038,123 Camelia Avenue,,Oxnard,California,United States,93030


##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dw;
SHOW TABLES

database,tableName,isTemporary
adventure_works,dim_address,False
adventure_works,dim_customer,False
adventure_works,dim_date,False
adventure_works,dim_product,False
,view_date,True
,view_product,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "SalesOrderID INT")
 .option("cloudFiles.schemaHints", "RevisionNumber TINYINT")
 .option("cloudFiles.schemaHints", "OrderDate TIMESTAMP")
 .option("cloudFiles.schemaHints", "DueDate TIMESTAMP") 
 .option("cloudFiles.schemaHints", "ShipDate TIMESTAMP")
 .option("cloudFiles.schemaHints", "Status TINYINT")
 .option("cloudFiles.schemaHints", "OnlineOrderFlag BINARY")
 .option("cloudFiles.schemaHints", "SalesOrderNumber STRING")
 .option("cloudFiles.schemaHints", "PurchaseOrderNumber STRING") 
 .option("cloudFiles.schemaHints", "AccountNumber STRING")
 .option("cloudFiles.schemaHints", "CustomerID INT")
 .option("cloudFiles.schemaHints", "ShipToAddressID INT")
 .option("cloudFiles.schemaHints", "BillToAddressID INT")
 .option("cloudFiles.schemaHints", "ShipMethod STRING")
 .option("cloudFiles.schemaHints", "SubTotal FLOAT")
 .option("cloudFiles.schemaHints", "TaxAmt FLOAT")
 .option("cloudFiles.schemaHints", "Freight FLOAT")
 .option("cloudFiles.schemaHints", "TotalDue FLOAT")
 .option("cloudFiles.schemaHints", "SalesOrderDetailID INT")
 .option("cloudFiles.schemaHints", "OrderQty SMALLINT")
 .option("cloudFiles.schemaHints", "ProductID INT")
 .option("cloudFiles.schemaHints", "UnitPrice FLOAT")
 .option("cloudFiles.schemaHints", "UnitPriceDiscount FLOAT")
 .option("cloudFiles.schemaHints", "LineTotal DECIMAL")
 .option("cloudFiles.schemaHints", "rowguid STRING")
 .option("cloudFiles.schemaHints", "ModifiedDate TIMESTAMP")
 .option("cloudFiles.schemaLocation", output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

AccountNumber,BillToAddressID,CustomerID,DueDate,Freight,LineTotal,ModifiedDate,OnlineOrderFlag,OrderDate,OrderQty,ProductID,PurchaseOrderNumber,RevisionNumber,SalesOrderDetailID,SalesOrderID,SalesOrderNumber,ShipDate,ShipMethod,ShipToAddressID,Status,SubTotal,TaxAmt,TotalDue,UnitPrice,UnitPriceDiscount,rowguid,_rescued_data,receipt_time,source_file
10-4020-000609,1092,29847,2008-06-13T00:00:00,22.0087,356.898,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,1,836,PO348186287,2,110562,71774,SO71774,2008-06-08T00:00:00,CARGO TRANSPORT 5,1092,5,880.3484,70.4279,972.785,356.898,0.0,e3a1994c-7a68-4ce8-96a3-77fdd3bbd730,,2022-11-30T17:00:01.266+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000609,1092,29847,2008-06-13T00:00:00,22.0087,356.898,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,1,822,PO348186287,2,110563,71774,SO71774,2008-06-08T00:00:00,CARGO TRANSPORT 5,1092,5,880.3484,70.4279,972.785,356.898,0.0,5c77f557-fdb6-43ba-90b9-9a7aec55ca32,,2022-11-30T17:00:01.266+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000106,640,30072,2008-06-13T00:00:00,1.9703,63.9,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,1,907,PO19952192051,2,110567,71776,SO71776,2008-06-08T00:00:00,CARGO TRANSPORT 5,640,5,78.81,6.3048,87.0851,63.9,0.0,6dbfe398-d15d-425e-aa58-88178fe360e5,,2022-11-30T17:00:01.266+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000340,653,30113,2008-06-13T00:00:00,960.4672,873.816,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,4,905,PO19604173239,2,110616,71780,SO71780,2008-06-08T00:00:00,CARGO TRANSPORT 5,653,5,38418.6895,3073.4952,42452.6519,218.454,0.0,377246c9-4483-48ed-a5b9-e56f005364e0,,2022-11-30T17:00:01.266+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000340,653,30113,2008-06-13T00:00:00,960.4672,923.388,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,2,983,PO19604173239,2,110617,71780,SO71780,2008-06-08T00:00:00,CARGO TRANSPORT 5,653,5,38418.6895,3073.4952,42452.6519,461.694,0.0,43a54bcd-536d-4a1b-8e69-24d083507a14,,2022-11-30T17:00:01.266+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000340,653,30113,2008-06-13T00:00:00,960.4672,406.7928,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,6,988,PO19604173239,2,110618,71780,SO71780,2008-06-08T00:00:00,CARGO TRANSPORT 5,653,5,38418.6895,3073.4952,42452.6519,112.998,0.4,12706fab-f3a2-48c6-b7c7-1ccde4081f18,,2022-11-30T17:00:01.266+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000340,653,30113,2008-06-13T00:00:00,960.4672,1637.4,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,2,748,PO19604173239,2,110619,71780,SO71780,2008-06-08T00:00:00,CARGO TRANSPORT 5,653,5,38418.6895,3073.4952,42452.6519,818.7,0.0,b12f0d3b-5b4e-4f1f-b2f0-f7cde99dd826,,2022-11-30T17:00:01.266+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000340,653,30113,2008-06-13T00:00:00,960.4672,323.994,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,1,990,PO19604173239,2,110620,71780,SO71780,2008-06-08T00:00:00,CARGO TRANSPORT 5,653,5,38418.6895,3073.4952,42452.6519,323.994,0.0,f117a449-039d-44b8-a4b2-b12001dacc01,,2022-11-30T17:00:01.266+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000340,653,30113,2008-06-13T00:00:00,960.4672,149.874,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,1,926,PO19604173239,2,110621,71780,SO71780,2008-06-08T00:00:00,CARGO TRANSPORT 5,653,5,38418.6895,3073.4952,42452.6519,149.874,0.0,92e5052b-72d0-4c91-9a8c-42591803667e,,2022-11-30T17:00:01.266+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000340,653,30113,2008-06-13T00:00:00,960.4672,809.76,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,1,743,PO19604173239,2,110622,71780,SO71780,2008-06-08T00:00:00,CARGO TRANSPORT 5,653,5,38418.6895,3073.4952,42452.6519,809.76,0.0,8bd33bed-c4f6-4d44-84fb-a7d04afcd794,,2022-11-30T17:00:01.266+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json


In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

AccountNumber,BillToAddressID,CustomerID,DueDate,Freight,LineTotal,ModifiedDate,OnlineOrderFlag,OrderDate,OrderQty,ProductID,PurchaseOrderNumber,RevisionNumber,SalesOrderDetailID,SalesOrderID,SalesOrderNumber,ShipDate,ShipMethod,ShipToAddressID,Status,SubTotal,TaxAmt,TotalDue,UnitPrice,UnitPriceDiscount,rowguid,_rescued_data,receipt_time,source_file
10-4020-000609,1092,29847,2008-06-13T00:00:00,22.0087,356.898,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,1,836,PO348186287,2,110562,71774,SO71774,2008-06-08T00:00:00,CARGO TRANSPORT 5,1092,5,880.3484,70.4279,972.785,356.898,0.0,e3a1994c-7a68-4ce8-96a3-77fdd3bbd730,,2022-11-30T17:00:07.160+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000609,1092,29847,2008-06-13T00:00:00,22.0087,356.898,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,1,822,PO348186287,2,110563,71774,SO71774,2008-06-08T00:00:00,CARGO TRANSPORT 5,1092,5,880.3484,70.4279,972.785,356.898,0.0,5c77f557-fdb6-43ba-90b9-9a7aec55ca32,,2022-11-30T17:00:07.160+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000106,640,30072,2008-06-13T00:00:00,1.9703,63.9,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,1,907,PO19952192051,2,110567,71776,SO71776,2008-06-08T00:00:00,CARGO TRANSPORT 5,640,5,78.81,6.3048,87.0851,63.9,0.0,6dbfe398-d15d-425e-aa58-88178fe360e5,,2022-11-30T17:00:07.160+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000340,653,30113,2008-06-13T00:00:00,960.4672,873.816,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,4,905,PO19604173239,2,110616,71780,SO71780,2008-06-08T00:00:00,CARGO TRANSPORT 5,653,5,38418.6895,3073.4952,42452.6519,218.454,0.0,377246c9-4483-48ed-a5b9-e56f005364e0,,2022-11-30T17:00:07.160+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000340,653,30113,2008-06-13T00:00:00,960.4672,923.388,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,2,983,PO19604173239,2,110617,71780,SO71780,2008-06-08T00:00:00,CARGO TRANSPORT 5,653,5,38418.6895,3073.4952,42452.6519,461.694,0.0,43a54bcd-536d-4a1b-8e69-24d083507a14,,2022-11-30T17:00:07.160+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000340,653,30113,2008-06-13T00:00:00,960.4672,406.7928,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,6,988,PO19604173239,2,110618,71780,SO71780,2008-06-08T00:00:00,CARGO TRANSPORT 5,653,5,38418.6895,3073.4952,42452.6519,112.998,0.4,12706fab-f3a2-48c6-b7c7-1ccde4081f18,,2022-11-30T17:00:07.160+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000340,653,30113,2008-06-13T00:00:00,960.4672,1637.4,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,2,748,PO19604173239,2,110619,71780,SO71780,2008-06-08T00:00:00,CARGO TRANSPORT 5,653,5,38418.6895,3073.4952,42452.6519,818.7,0.0,b12f0d3b-5b4e-4f1f-b2f0-f7cde99dd826,,2022-11-30T17:00:07.160+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000340,653,30113,2008-06-13T00:00:00,960.4672,323.994,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,1,990,PO19604173239,2,110620,71780,SO71780,2008-06-08T00:00:00,CARGO TRANSPORT 5,653,5,38418.6895,3073.4952,42452.6519,323.994,0.0,f117a449-039d-44b8-a4b2-b12001dacc01,,2022-11-30T17:00:07.160+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000340,653,30113,2008-06-13T00:00:00,960.4672,149.874,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,1,926,PO19604173239,2,110621,71780,SO71780,2008-06-08T00:00:00,CARGO TRANSPORT 5,653,5,38418.6895,3073.4952,42452.6519,149.874,0.0,92e5052b-72d0-4c91-9a8c-42591803667e,,2022-11-30T17:00:07.160+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json
10-4020-000340,653,30113,2008-06-13T00:00:00,960.4672,809.76,2008-06-01T00:00:00.000+0000,0,2008-06-01T00:00:00,1,743,PO19604173239,2,110622,71780,SO71780,2008-06-08T00:00:00,CARGO TRANSPORT 5,653,5,38418.6895,3073.4952,42452.6519,809.76,0.0,8bd33bed-c4f6-4d44-84fb-a7d04afcd794,,2022-11-30T17:00:07.160+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/SalesOrders01.json


In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

col_name,data_type,comment
AccountNumber,string,
BillToAddressID,bigint,
CustomerID,bigint,
DueDate,string,
Freight,double,
LineTotal,double,
ModifiedDate,timestamp,
OnlineOrderFlag,string,
OrderDate,string,
OrderQty,bigint,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT r.rental_id,
        r.rental_date,
        r.inventory_id,
        r.customer_id,
        r.return_date,
        r.staff_id,
        r.last_update, 

        p.amount,
        p.payment_date

        FROM sakila.rental as r
        INNER JOIN sakila.payment as p
        ON r.rental_id = p.rental_id;
        INNER JOIN adventure_works.dim_date od
        ON CAST(t.OrderDate AS DATE) = od.Date
        INNER JOIN adventure_works.dim_date dd
        ON CAST(t.DueDate AS DATE) = dd.Date
        INNER JOIN adventure_works.dim_date sd
        ON CAST(t.ShipDate AS DATE) = sd.Date)

In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

In [0]:
%sql
SELECT * FROM fact_rentals_silver

SalesOrderID,RevisionNumber,OrderMonth,OrderDayName,OrderDay,OrderYear,DueMonth,DueDayName,DueDate,DueYear,ShipMonth,ShipDayName,ShipDay,ShipYear,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,CustomerID,FirstName,LastName,ShipToAddressID,ShipToAddressLine1,ShipToAddressLine2,ShipToCity,ShipToStateProvince,ShipToPostalCode,BillToAddressID,BillToAddressLine1,BillToAddressLine2,BillToCity,BillToStateProvince,BillToPostalCode,ShipMethod,SubTotal,TaxAmt,Freight,TotalDue,SalesOrderDetailID,OrderQty,ProductID,ProductNumber,UnitPrice,UnitPriceDiscount,LineTotal,rowguid,ModifiedDate,receipt_time,source_file


Output can only be rendered in Databricks

In [0]:
%sql
DESCRIBE EXTENDED sakila_dw.fact_rentals_silver

col_name,data_type,comment
SalesOrderID,bigint,
RevisionNumber,bigint,
OrderMonth,string,
OrderDayName,string,
OrderDay,int,
OrderYear,int,
DueMonth,string,
DueDayName,string,
DueDate,int,
DueYear,int,


##### 6.4. Gold Table: Perform Aggregations

In [0]:
%sql
SELECT CustomerID
  , LastName
  , FirstName
  , RentalMonth
  , COUNT(RentalID) AS RentalCount
FROM sakila_dw.fact_rentals_silver
GROUP BY CustomerID, LastName, FirstName, OrderMonth
ORDER BY RentalCount DESC

CustomerID,LastName,FirstName,OrderMonth,ProductCount
29929,Kurtz,Jeffrey,June,50
30050,Sunkammurali,Krishna,June,46
29796,Grande,Jon,June,46
29485,Abel,Catherine,June,43
29957,Liu,Kevin,June,43
29736,Eminhizer,Terry,June,43
29546,Beck,Christopher,June,42
29938,Campbell,Frank,June,31
29922,Kotc,Pamala,June,30
30113,Venugopal,Raja,June,29


In [0]:
%sql
SELECT pc.CustomerID
  , os.LastName AS CustomerName
  , os.ProductNumber
  , pc.ProductCount
FROM sakila.fact_rentals_silver AS os
INNER JOIN (
  SELECT CustomerID
  , COUNT(RentalID) AS RentalCount
  FROM sakila_dw.fact_rentals_silver
  GROUP BY CustomerID
) AS pc
ON pc.CustomerID = os.CustomerID
ORDER BY RentalCount DESC

CustomerID,CustomerName,ProductNumber,ProductCount
29929,Kurtz,HY-1023-70,50
29929,Kurtz,BK-M18B-40,50
29929,Kurtz,PD-M340,50
29929,Kurtz,HL-U509,50
29929,Kurtz,SE-M236,50
29929,Kurtz,BK-M18S-40,50
29929,Kurtz,BK-M68S-38,50
29929,Kurtz,BK-M18B-44,50
29929,Kurtz,BK-M38S-42,50
29929,Kurtz,SJ-0194-L,50
