### Aida Final DS2002 Project

For the final capstone project I utilized the chinook data warehouse. My date and genre dimension were from MySQL, my employee dimension was from MongoDB, and all of the other dimensions were json cloud files located in the batch folder. I had three evenly split intervals of my invoice details fact table located in the stream folder. I was able to integrate static and real time data and utilized bronze, silver, and gold architecure to demosntrate populationg my dimensional datamart. I also have an aggregation query alongside visualizations (including a pivot table) to reveal that my aggregation works correctly.

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### We will instatiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "amd9asz-mysql2.mysql.database.azure.com"
jdbc_port = 3306
src_database = "chinook_dw"

connection_properties = {
  "user" : "amd9asz",
  "password" : "Password123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "aidaclust.hrxvgai"
atlas_database_name = "chinook_dw"
atlas_user_name = "amd9asz"
atlas_password = "Password123"

mongo_uri = f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net/{atlas_database_name}"
print(mongo_uri)

# Data Files (JSON) Information ###############################
dst_database = "chinook_d1h"

base_dir = "dbfs:/FileStore/final_data"
database_dir = f"{base_dir}/{dst_database}"

batch_dir = f"{base_dir}/batch"
stream_dir = f"{base_dir}/stream"

inv_stream_dir = f"{stream_dir}/invoice_details" #fact table data

inv_output_bronze = f"{database_dir}/fact_inv_details/bronze"
inv_output_silver = f"{database_dir}/fact_inv_details/silver"
inv_output_gold   = f"{database_dir}/fact_inv_details/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_invoice_details", True) 


# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

mongodb+srv://amd9asz:Password123@aidaclust.hrxvgai.mongodb.net/chinook_dw


True

#### We will also define global functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

 We will populate Dimensions by Ingesting Reference (Cold-path) Data 
Here I will fetch reference data From an Azure MySQL Database and create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS chinook_d1h CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS chinook_d1h
COMMENT "DS-2002 FINAL DATABASE"
LOCATION "dbfs:/FileStore/final_data/chinook_d1h"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 final");

Here I will create a new table that source data from the date tim table in Azure MySQL database

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://amd9asz-mysql2.mysql.database.azure.com:3306/chinook_dw", --Replace with your Server Name
  dbtable "dim_date",
  user "amd9asz",    --Replace with your User Name
  password "Password123"  --Replace with you password
)

In [0]:
%sql

USE DATABASE chinook_d1h;

CREATE OR REPLACE TABLE chinook_d1h.dim_date   
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/final_data/chinook_d1h/dim_date"
AS SELECT * FROM view_date


num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED chinook_d1h.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,int,
day_name_of_week,varchar(10),
day_of_month,int,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM chinook_d1h.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


We will create a new table that sources genre dimension data from an Azure MySQL database

In [0]:
%sql
-- Create a Temporary View named "view_genre" that extracts data from your MySQL chinook database.

CREATE OR REPLACE TEMPORARY VIEW view_genre
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://amd9asz-mysql2.mysql.database.azure.com:3306/chinook_dw", --Replace with your Server Name
  dbtable "dim_genre",
  user "amd9asz",    --Replace with your User Name
  password "Password123"  --Replace with you password
)

In [0]:
%sql
USE DATABASE chinook_d1h;

CREATE OR REPLACE TABLE chinook_d1h.dim_genre
COMMENT "Genre Dimension Table"
LOCATION "dbfs:/FileStore/final_data/chinook_d1h/dim_genre"
AS SELECT * FROM view_genre

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED chinook_d1h.dim_genre;

col_name,data_type,comment
GenreKey,int,
Name,varchar(120),
,,
# Delta Statistics Columns,,
Column Names,"GenreKey, Name",
Column Selection Method,first-32,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,chinook_d1h,


In [0]:
%sql
SELECT * FROM chinook_d1h.dim_genre LIMIT 5

GenreKey,Name
1,Rock
2,Jazz
3,Metal
4,Alternative & Punk
5,Rock And Roll


I will also fetch invoice dimension data in the same way


## Fetching data from MongoDB Atlas Database
We will load JSON Data into MongoDB specifically and then retrieve that data

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/final_data'

path,name,size,modificationTime
dbfs:/FileStore/final_data/batch/chinookdw_dim_customer.json,chinookdw_dim_customer.json,16807,1701965141000
dbfs:/FileStore/final_data/batch/chinookdw_dim_employee.json,chinookdw_dim_employee.json,2955,1701963332000
dbfs:/FileStore/final_data/batch/chinookdw_dim_invoice.json,chinookdw_dim_invoice.json,96185,1701968613000
dbfs:/FileStore/final_data/batch/chinookdw_dim_invoiceline.json,chinookdw_dim_invoiceline.json,205901,1701985715000


In [0]:
source_dir = '/dbfs/FileStore/final_data/batch'
json_files = {"employees" : 'chinookdw_dim_employee.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f0a5f836e40>

Now let's fetch that data from MongoDB

In [0]:
%scala
import com.mongodb.spark._

val userName = "amd9asz"
val pwd = "Password123"
val clusterName = "aidaclust.hrxvgai"
val atlas_uri = s"mongodb+srv://amd9asz:Password123@aidaclust.hrxvgai.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_employee = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "chinook_dw")
.option("collection", "employees").load()
.select("EmployeeKey","LastName","FirstName","Title","ReportsTo","BirthDate","HireDate","Address","City","State","Country","PostalCode","Phone","Fax","Email")

display(df_employee)

EmployeeKey,LastName,FirstName,Title,ReportsTo,BirthDate,HireDate,Address,City,State,Country,PostalCode,Phone,Fax,Email
1,Adams,Andrew,General Manager,0,1962-02-18 00:00:00,2002-08-14 00:00:00,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com
2,Edwards,Nancy,Sales Manager,1,1958-12-08 00:00:00,2002-05-01 00:00:00,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com
3,Peacock,Jane,Sales Support Agent,2,1973-08-29 00:00:00,2002-04-01 00:00:00,1111 6 Ave SW,Calgary,AB,Canada,T2P 5M5,+1 (403) 262-3443,+1 (403) 262-6712,jane@chinookcorp.com
4,Park,Margaret,Sales Support Agent,2,1947-09-19 00:00:00,2003-05-03 00:00:00,683 10 Street SW,Calgary,AB,Canada,T2P 5G3,+1 (403) 263-4423,+1 (403) 263-4289,margaret@chinookcorp.com
5,Johnson,Steve,Sales Support Agent,2,1965-03-03 00:00:00,2003-10-17 00:00:00,7727B 41 Ave,Calgary,AB,Canada,T3B 1Y7,1 (780) 836-9987,1 (780) 836-9543,steve@chinookcorp.com
6,Mitchell,Michael,IT Manager,1,1973-07-01 00:00:00,2003-10-17 00:00:00,5827 Bowness Road NW,Calgary,AB,Canada,T3B 0C5,+1 (403) 246-9887,+1 (403) 246-9899,michael@chinookcorp.com
7,King,Robert,IT Staff,6,1970-05-29 00:00:00,2004-01-02 00:00:00,590 Columbia Boulevard West,Lethbridge,AB,Canada,T1K 5N8,+1 (403) 456-9986,+1 (403) 456-8485,robert@chinookcorp.com
8,Callahan,Laura,IT Staff,6,1968-01-09 00:00:00,2004-03-04 00:00:00,923 7 ST NW,Lethbridge,AB,Canada,T1H 1Y8,+1 (403) 467-3351,+1 (403) 467-8772,laura@chinookcorp.com


In [0]:
%scala
df_employee.printSchema()

Now we will create a new employee dimension table in the databricks metadata base (chinook_d1h)

In [0]:
%scala

df_employee.write.format("delta").mode("overwrite").saveAsTable("chinook_d1h.dim_employee")

In [0]:
%sql
DESCRIBE EXTENDED chinook_d1h.dim_employee

col_name,data_type,comment
EmployeeKey,int,
LastName,string,
FirstName,string,
Title,string,
ReportsTo,int,
BirthDate,string,
HireDate,string,
Address,string,
City,string,
State,string,


In [0]:
%sql
SELECT * FROM chinook_d1h.dim_employee LIMIT 5

EmployeeKey,LastName,FirstName,Title,ReportsTo,BirthDate,HireDate,Address,City,State,Country,PostalCode,Phone,Fax,Email
1,Adams,Andrew,General Manager,0,1962-02-18 00:00:00,2002-08-14 00:00:00,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com
2,Edwards,Nancy,Sales Manager,1,1958-12-08 00:00:00,2002-05-01 00:00:00,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com
3,Peacock,Jane,Sales Support Agent,2,1973-08-29 00:00:00,2002-04-01 00:00:00,1111 6 Ave SW,Calgary,AB,Canada,T2P 5M5,+1 (403) 262-3443,+1 (403) 262-6712,jane@chinookcorp.com
4,Park,Margaret,Sales Support Agent,2,1947-09-19 00:00:00,2003-05-03 00:00:00,683 10 Street SW,Calgary,AB,Canada,T2P 5G3,+1 (403) 263-4423,+1 (403) 263-4289,margaret@chinookcorp.com
5,Johnson,Steve,Sales Support Agent,2,1965-03-03 00:00:00,2003-10-17 00:00:00,7727B 41 Ave,Calgary,AB,Canada,T3B 1Y7,1 (780) 836-9987,1 (780) 836-9543,steve@chinookcorp.com


#### Fetching Data from Cloud File System (json file)
We will be fetching customer data from a cloud file system

In [0]:
customer_json = f"{batch_dir}/chinookdw_dim_customer.json"

df_customer = spark.read.format('json').options(header = 'true', inferSchema = 'true').load(customer_json)

display(df_customer)

Address,City,Company,Country,CustomerKey,Email,Fax,FirstName,LastName,Phone,PostalCode,State,SupportRepId,_corrupt_record
,,,,,,,,,,,,,"[{""CustomerKey"":1, ""FirstName"":""Lu\u00eds"", ""LastName"":""Gon\u00e7alves"", ""Company"":""Embraer - Empresa Brasileira de Aeron\u00e1utica S.A."", ""Address"":""Av. Brigadeiro Faria Lima, 2170"", ""City"":""S\u00e3o Jos\u00e9 dos Campos"", ""State"":""SP"", ""Country"":""Brazil"", ""PostalCode"":""12227-000"", ""Phone"":""+55 (12) 3923-5555"", ""Fax"":""+55 (12) 3923-5566"", ""Email"":""luisg@embraer.com.br"", ""SupportRepId"":3},"
Theodor-Heuss-Straße 34,Stuttgart,,Germany,2.0,leonekohler@surfeu.de,,Leonie,Köhler,+49 0711 2842222,70174,,5.0,
1498 rue Bélanger,Montréal,,Canada,3.0,ftremblay@gmail.com,,François,Tremblay,+1 (514) 721-4711,H2G 1A7,QC,3.0,
Ullevålsveien 14,Oslo,,Norway,4.0,bjorn.hansen@yahoo.no,,Bjørn,Hansen,+47 22 44 22 22,0171,,4.0,
Klanova 9/506,Prague,JetBrains s.r.o.,Czech Republic,5.0,frantisekw@jetbrains.com,+420 2 4172 5555,František,Wichterlová,+420 2 4172 5555,14700,,4.0,
Rilská 3174/6,Prague,,Czech Republic,6.0,hholy@gmail.com,,Helena,Holý,+420 2 4177 0449,14300,,5.0,
"Rotenturmstraße 4, 1010 Innere Stadt",Vienne,,Austria,7.0,astrid.gruber@apple.at,,Astrid,Gruber,+43 01 5134505,1010,,5.0,
Grétrystraat 63,Brussels,,Belgium,8.0,daan_peeters@apple.be,,Daan,Peeters,+32 02 219 03 03,1000,,4.0,
Sønder Boulevard 51,Copenhagen,,Denmark,9.0,kara.nielsen@jubii.dk,,Kara,Nielsen,+453 3331 9991,1720,,4.0,
"Rua Dr. Falcão Filho, 155",São Paulo,Woodstock Discos,Brazil,10.0,eduardo@woodstock.com.br,+55 (11) 3033-4564,Eduardo,Martins,+55 (11) 3033-5446,01007-010,SP,4.0,


In [0]:
df_customer.printSchema()

root
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- CustomerKey: long (nullable = true)
 |-- Email: string (nullable = true)
 |-- Fax: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- PostalCode: string (nullable = true)
 |-- State: string (nullable = true)
 |-- SupportRepId: long (nullable = true)
 |-- _corrupt_record: string (nullable = true)



In [0]:
df_customer.write.format("delta").mode("overwrite").saveAsTable("chinook_d1h.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED chinook_d1h.dim_customer;

col_name,data_type,comment
Address,string,
City,string,
Company,string,
Country,string,
CustomerKey,bigint,
Email,string,
Fax,string,
FirstName,string,
LastName,string,
Phone,string,


In [0]:
%sql
SELECT * FROM chinook_d1h.dim_customer LIMIT 5;

Address,City,Company,Country,CustomerKey,Email,Fax,FirstName,LastName,Phone,PostalCode,State,SupportRepId,_corrupt_record
,,,,,,,,,,,,,"[{""CustomerKey"":1, ""FirstName"":""Lu\u00eds"", ""LastName"":""Gon\u00e7alves"", ""Company"":""Embraer - Empresa Brasileira de Aeron\u00e1utica S.A."", ""Address"":""Av. Brigadeiro Faria Lima, 2170"", ""City"":""S\u00e3o Jos\u00e9 dos Campos"", ""State"":""SP"", ""Country"":""Brazil"", ""PostalCode"":""12227-000"", ""Phone"":""+55 (12) 3923-5555"", ""Fax"":""+55 (12) 3923-5566"", ""Email"":""luisg@embraer.com.br"", ""SupportRepId"":3},"
Theodor-Heuss-Straße 34,Stuttgart,,Germany,2.0,leonekohler@surfeu.de,,Leonie,Köhler,+49 0711 2842222,70174,,5.0,
1498 rue Bélanger,Montréal,,Canada,3.0,ftremblay@gmail.com,,François,Tremblay,+1 (514) 721-4711,H2G 1A7,QC,3.0,
Ullevålsveien 14,Oslo,,Norway,4.0,bjorn.hansen@yahoo.no,,Bjørn,Hansen,+47 22 44 22 22,0171,,4.0,
Klanova 9/506,Prague,JetBrains s.r.o.,Czech Republic,5.0,frantisekw@jetbrains.com,+420 2 4172 5555,František,Wichterlová,+420 2 4172 5555,14700,,4.0,


Let's fetch invoice dimension data the same way 


In [0]:
invoice_json = f"{batch_dir}/chinookdw_dim_invoiceline.json"

df_invoice = spark.read.format('json').options(header = 'true', inferSchema = 'true').load(invoice_json)

display(df_invoice)

InvoiceKey,InvoiceLineKey,Quantity,TrackKey,UnitPrice,_corrupt_record
,,,,,"[{""InvoiceLineKey"":1, ""InvoiceKey"":1, ""TrackKey"":2, ""UnitPrice"":0.99, ""Quantity"":1},"
1.0,2.0,1.0,4.0,0.99,
2.0,3.0,1.0,6.0,0.99,
2.0,4.0,1.0,8.0,0.99,
2.0,5.0,1.0,10.0,0.99,
2.0,6.0,1.0,12.0,0.99,
3.0,7.0,1.0,16.0,0.99,
3.0,8.0,1.0,20.0,0.99,
3.0,9.0,1.0,24.0,0.99,
3.0,10.0,1.0,28.0,0.99,


In [0]:
df_invoice.printSchema()

root
 |-- InvoiceKey: long (nullable = true)
 |-- InvoiceLineKey: long (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- TrackKey: long (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- _corrupt_record: string (nullable = true)



In [0]:
df_invoice.write.format("delta").mode("overwrite").saveAsTable("chinook_d1h.dim_invoiceline")

In [0]:
%sql
DESCRIBE EXTENDED chinook_d1h.dim_invoiceline


col_name,data_type,comment
InvoiceKey,bigint,
InvoiceLineKey,bigint,
Quantity,bigint,
TrackKey,bigint,
UnitPrice,double,
_corrupt_record,string,
,,
# Delta Statistics Columns,,
Column Names,"Quantity, UnitPrice, TrackKey, InvoiceKey, InvoiceLineKey, _corrupt_record",
Column Selection Method,first-32,


In [0]:
%sql
SELECT * FROM chinook_d1h.dim_invoiceline LIMIT 5;

InvoiceKey,InvoiceLineKey,Quantity,TrackKey,UnitPrice,_corrupt_record
,,,,,"[{""InvoiceLineKey"":1, ""InvoiceKey"":1, ""TrackKey"":2, ""UnitPrice"":0.99, ""Quantity"":1},"
1.0,2.0,1.0,4.0,0.99,
2.0,3.0,1.0,6.0,0.99,
2.0,4.0,1.0,8.0,0.99,
2.0,5.0,1.0,10.0,0.99,


###Now we verify our dimension tables

In [0]:
%sql

USE chinook_d1h;
SHOW TABLES

database,tableName,isTemporary
chinook_d1h,dim_customer,False
chinook_d1h,dim_date,False
chinook_d1h,dim_employee,False
chinook_d1h,dim_genre,False
chinook_d1h,dim_invoiceline,False
,display_query_1,True
,display_query_2,True
,display_query_3,True
,display_query_4,True
,display_query_5,True


#### We will now integrate reference data with real-time data

Using autoloader to process streaming (Hot path) for fact data
Bronze table: Process raw json data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "InvoiceKey BIGINT")
 .option("cloudFiles.schemaHints", "InvoiceDate STRING")
 .option("cloudFiles.schemaHints", "CustomerKey BIGINT")
 .option("cloudFiles.schemaHints", "BillingState STRING") 
 .option("cloudFiles.schemaHints", "BillingPostalCode STRING")
 .option("cloudFiles.schemaHints", "BillingCountry STRING")
 .option("cloudFiles.schemaHints", "BillingCity STRING")
  .option("cloudFiles.schemaHints", "BillingAddress STRING")
 .option("cloudFiles.schemaLocation", inv_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(inv_stream_dir)
 .createOrReplaceTempView("invoices_raw_tempview"))


In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW invoices_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM invoices_raw_tempview
)

In [0]:
%sql
SELECT * FROM invoices_bronze_tempview

BillingAddress,BillingCity,BillingCountry,BillingPostalCode,BillingState,CustomerKey,InvoiceDate,InvoiceKey,Total,_rescued_data,receipt_time,source_file
319 N. Frances Street,Madison,USA,53703,WI,25,2011-05-29 00:00:00,201,18.86,,2023-12-08T02:02:27.273Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
"4, Rue Milton",Paris,France,75009,,39,2011-06-06 00:00:00,202,1.99,,2023-12-08T02:02:27.273Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
"8, Rue Hanovre",Paris,France,75002,,40,2011-06-19 00:00:00,203,2.98,,2023-12-08T02:02:27.273Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
"9, Place Louis Barthou",Bordeaux,France,33000,,42,2011-06-19 00:00:00,204,3.98,,2023-12-08T02:02:27.273Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
Porthaninkatu 9,Helsinki,Finland,00530,,44,2011-06-20 00:00:00,205,7.96,,2023-12-08T02:02:27.273Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
Lijnbaansgracht 120bg,Amsterdam,Netherlands,1016,VV,48,2011-06-21 00:00:00,206,8.94,,2023-12-08T02:02:27.273Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
110 Raeburn Pl,Edinburgh,United Kingdom,EH4 1HH,,54,2011-06-24 00:00:00,207,8.91,,2023-12-08T02:02:27.273Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
Ullevålsveien 14,Oslo,Norway,0171,,4,2011-06-29 00:00:00,208,15.86,,2023-12-08T02:02:27.273Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
627 Broadway,New York,USA,10012-2612,NY,18,2011-07-07 00:00:00,209,0.99,,2023-12-08T02:02:27.273Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
1 Infinite Loop,Cupertino,USA,95014,CA,19,2011-07-20 00:00:00,210,1.98,,2023-12-08T02:02:27.273Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json


In [0]:

(spark.table("invoices_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{inv_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_invoices_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f0a5f7d8820>

In [0]:
(spark.readStream
  .table("fact_invoices_bronze")
  .createOrReplaceTempView("invoices_silver_tempview"))

In [0]:
%sql
SELECT * FROM invoices_silver_tempview

BillingAddress,BillingCity,BillingCountry,BillingPostalCode,BillingState,CustomerKey,InvoiceDate,InvoiceKey,Total,_rescued_data,receipt_time,source_file
319 N. Frances Street,Madison,USA,53703,WI,25,2011-05-29 00:00:00,201,18.86,,2023-12-08T02:02:29.822Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
"4, Rue Milton",Paris,France,75009,,39,2011-06-06 00:00:00,202,1.99,,2023-12-08T02:02:29.822Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
"8, Rue Hanovre",Paris,France,75002,,40,2011-06-19 00:00:00,203,2.98,,2023-12-08T02:02:29.822Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
"9, Place Louis Barthou",Bordeaux,France,33000,,42,2011-06-19 00:00:00,204,3.98,,2023-12-08T02:02:29.822Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
Porthaninkatu 9,Helsinki,Finland,00530,,44,2011-06-20 00:00:00,205,7.96,,2023-12-08T02:02:29.822Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
Lijnbaansgracht 120bg,Amsterdam,Netherlands,1016,VV,48,2011-06-21 00:00:00,206,8.94,,2023-12-08T02:02:29.822Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
110 Raeburn Pl,Edinburgh,United Kingdom,EH4 1HH,,54,2011-06-24 00:00:00,207,8.91,,2023-12-08T02:02:29.822Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
Ullevålsveien 14,Oslo,Norway,0171,,4,2011-06-29 00:00:00,208,15.86,,2023-12-08T02:02:29.822Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
627 Broadway,New York,USA,10012-2612,NY,18,2011-07-07 00:00:00,209,0.99,,2023-12-08T02:02:29.822Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json
1 Infinite Loop,Cupertino,USA,95014,CA,19,2011-07-20 00:00:00,210,1.98,,2023-12-08T02:02:29.822Z,dbfs:/FileStore/final_data/stream/invoice_details/chinook_invoice_details_3.json


In [0]:
%sql
DESCRIBE EXTENDED invoices_silver_tempview

col_name,data_type,comment
BillingAddress,string,
BillingCity,string,
BillingCountry,string,
BillingPostalCode,string,
BillingState,string,
CustomerKey,bigint,
InvoiceDate,string,
InvoiceKey,bigint,
Total,double,
_rescued_data,string,


In [0]:
%sql
  CREATE OR REPLACE TEMPORARY VIEW fact_invoices_silver_tempview AS (
  SELECT
    i.InvoiceKey,
    i.CustomerKey,
    L.InvoiceLineKey,
    L.TrackKey,
    i.Total AS invoice_total,
    c.FirstName AS customer_first_name,
    c.LastName AS customer_last_name,
    c.email AS customer_email,
    L.UnitPrice AS unit_price,
    L.Quantity AS quantity,
    i.InvoiceDate AS purchase_date,
    d.day_name_of_week AS returned_day_name_of_week,
    d.day_of_month AS returned_day_of_month,
    d.weekday_weekend AS returned_weekday_weekend,
    d.month_name AS returned_month_name,
    d.calendar_quarter AS returned_calendar_quarter,
    d.calendar_year AS returned_calendar_year
  FROM
    invoices_silver_tempview AS i
    INNER JOIN chinook_d1h.dim_customer AS c 
    ON c.Customerkey = i.CustomerKey
    INNER JOIN chinook_d1h.dim_invoiceline AS L 
    ON L.InvoiceKey = i.InvoiceKey    
    LEFT OUTER JOIN chinook_d1h.dim_date AS d 
    ON d.date_key = i.InvoiceDate
)

In [0]:
(spark.table("fact_invoices_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{inv_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_invoices_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f0a5f604bb0>

In [0]:
%sql
SELECT * FROM fact_invoices_silver

InvoiceKey,CustomerKey,InvoiceLineKey,TrackKey,invoice_total,customer_first_name,customer_last_name,customer_email,unit_price,quantity,purchase_date,returned_day_name_of_week,returned_day_of_month,returned_weekday_weekend,returned_month_name,returned_calendar_quarter,returned_calendar_year
201,25,1099,3209,18.86,Victor,Stevens,vstevens@yahoo.com,1.99,1,2011-05-29 00:00:00,,,,,,
202,39,1100,3223,1.99,Camille,Bernard,camille.bernard@yahoo.fr,1.99,1,2011-06-06 00:00:00,,,,,,
203,40,1102,3225,2.98,Dominique,Lefebvre,dominiquelefebvre@gmail.com,0.99,1,2011-06-19 00:00:00,,,,,,
204,42,1104,3229,3.98,Wyatt,Girard,wyatt.girard@yahoo.fr,1.99,1,2011-06-19 00:00:00,,,,,,
205,44,1108,3237,7.96,Terhi,Hämäläinen,terhi.hamalainen@apple.fi,1.99,1,2011-06-20 00:00:00,,,,,,
206,48,1114,3261,8.94,Johannes,Van der Berg,johavanderberg@yahoo.nl,0.99,1,2011-06-21 00:00:00,,,,,,
207,54,1123,3315,8.91,Steve,Murray,steve.murray@yahoo.uk,0.99,1,2011-06-24 00:00:00,,,,,,
208,4,1137,3441,15.86,Bjørn,Hansen,bjorn.hansen@yahoo.no,0.99,1,2011-06-29 00:00:00,,,,,,
209,18,1138,3455,0.99,Michelle,Brooks,michelleb@aol.com,0.99,1,2011-07-07 00:00:00,,,,,,
210,19,1140,3457,1.98,Tim,Goyer,tgoyer@apple.com,0.99,1,2011-07-20 00:00:00,,,,,,


Now let's perform gold table aggregations

In [0]:
%sql
SELECT 
c.FirstName, c.LastName AS customer_name, 
c.Email, i.purchase_date,
SUM (i.invoice_total) as total_amt_purchased
FROM chinook_d1h.`fact_invoices_silver` AS i
INNER JOIN chinook_d1h.dim_customer AS c
ON i.CustomerKey = c.CustomerKey
GROUP BY c.FirstName, c.LastName, c.Email, i.purchase_date
ORDER BY total_amt_purchased DESC;

--In this aggregate query I found the full name and email of customer and matched it with their invoice total and the date of purchase for a given series of purchases from our data watehouse. Please see the attached pivot chart in the visualizations tab. We can visualize on what specific days were certain customers spending more money and also who were the customers spending larger amounts of money


FirstName,customer_name,Email,purchase_date,total_amt_purchased
Helena,Holý,hholy@gmail.com,2013-11-13 00:00:00,362.0400000000001
Richard,Cunningham,ricunningham@hotmail.com,2012-08-05 00:00:00,334.0400000000001
Ladislav,Kovács,ladislav_kovacs@apple.hu,2010-02-18 00:00:00,306.0400000000001
Hugh,O'Reilly,hughoreilly@apple.ie,2011-04-28 00:00:00,306.0400000000001
Astrid,Gruber,astrid.gruber@apple.at,2010-01-18 00:00:00,264.0400000000001
Victor,Stevens,vstevens@yahoo.com,2011-05-29 00:00:00,264.0400000000001
František,Wichterlová,frantisekw@jetbrains.com,2012-09-05 00:00:00,236.04000000000008
Isabelle,Mercier,isabelle_mercier@apple.fr,2012-10-06 00:00:00,236.04000000000008
Frank,Ralston,fralston@gmail.com,2010-03-21 00:00:00,222.04000000000008
Bjørn,Hansen,bjorn.hansen@yahoo.no,2011-06-29 00:00:00,222.04000000000008


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
#To clean up the folder
#%fs rm -r /FileStore/final_data/chinook_d1h