## DS 2002 Capstone Project 
### By Emma Hickey

**Importing the necessary libraries**

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

**Instantiate Global Variables**

In [0]:
# Azure MySQL Server Connection Information
jdbc_hostname = "jzm6ee-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "music_data_mart"

connection_properties = {
  "user" : "ehickey",
  "password" : "Greenriver1752!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information 
atlas_cluster_name = "Cluster0.kpk6z"
atlas_database_name = "Chinook_purchasing"
atlas_user_name = "emmahickey1752"
atlas_password = "p.P43rrC-srbub9"

# Data Files (JSON) Information 
dst_database = "chinook_dlh"

base_dir = "dbfs:/FileStore/final_project_data"
#base_dir = "dbfs:/user/jzm6ee@virginia.edu/final_project_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/sales"
batch_dir = f"{data_dir}/batch2"
stream_dir = f"{data_dir}/stream2"

music_sales_stream_dir = f"{stream_dir}/music_sales"

output_bronze = f"{database_dir}/fact_music_sales_orders/bronze"
output_silver = f"{database_dir}/fact_music_sales_orders/silver"
output_gold   = f"{database_dir}/fact_music_sales_orders/gold"

# Delete the Streaming Files 
dbutils.fs.rm(f"{database_dir}/fact_music_sales_orders", True)

# Delete the Database Files 
dbutils.fs.rm(database_dir, True)

True

### Define Global Functions

In [0]:

# Function for fetching a dataframe from Azure SQL database server
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure MySQL Database'''
    jdbcUrl = f"jdbc:mysql://{host_name}:{port}/{db_name}"
    
    '''Invoke the spark.read.jdbc() function to query the database, and fill a Pandas DataFrame.'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)
    
    return dframe


#Function for fetching a datafram from MongoDB Atlas database server Using PyMongo
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# Function for creating new collections by uploading JSON file(s) to MongoDB Atlas server
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Populating dimensions by ingesting reference (cold-path) data
#### Fetching reference data from Azure MySQL database
##### Creating a new databricks metadata database, and then creating a new table which sources its data from a view in an azure MySQL databse

In [0]:
%sql
DROP DATABASE IF EXISTS chinook_dlh CASCADE; /*dropping chinook database if its there and cascade ensures that everythings delted that is contained in the database*/

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS chinook_dlh /*Creates the chinook_dlh datalakehouse*/
COMMENT "Capstone Project Database"/*comments that it is my capstone project*/
LOCATION "dbfs:/FileStore/final_project_data/chinook_dlh" /*specifies where to put the location of the database(in final_project_data folder)*/
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone Project"); /* Notes that it contains personally identifiable ifnormation and that the purpose if for the capstone project*/

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_album /*Creating temporary view called view_album that gets data from mySQL server and brings to databricks cluster*/
USING org.apache.spark.sql.jdbc /*specifying data source type*/
OPTIONS (
  url "jdbc:mysql://jzm6ee-mysql.mysql.database.azure.com:3306/chinook",
  dbtable "album",
  user "ehickey",
  password "Greenriver1752!"
)/*specifys url, database table, my username, and password, for my sql azure data base server*/

In [0]:
%sql
USE DATABASE chinook_dlh; /*using database (chinook_dlh) which I just created*/

CREATE TABLE IF NOT EXISTS chinook_dlh.dim_album /*creating album dimension table in chinook data lakehouse*/
COMMENT "Album Dimension Table" /*commenting table name*/
LOCATION "dbfs:/FileStore/final_project_data/chinook_dlh/dim_album" /*specifying location for the file*/
AS SELECT * FROM view_album
/*selecting the data from view album*/

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM chinook_dlh.dim_album LIMIT 5 /* proving that the data is in table*/

AlbumId,Title,ArtistId
1,For Those About To Rock We Salute You,1
2,Balls to the Wall,2
3,Restless and Wild,2
4,Let There Be Rock,1
5,Big Ones,3


In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_album; /*describing data types in table*/

col_name,data_type,comment
AlbumId,int,
Title,varchar(160),
ArtistId,int,
,,
# Delta Statistics Columns,,
Column Names,"AlbumId, Title, ArtistId",
Column Selection Method,first-32,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_invoice_line /*Creating temporary view called view_album that gets data from mySQL server and brings to databricks cluster*/
USING org.apache.spark.sql.jdbc /*specifying data source type*/
OPTIONS (
  url "jdbc:mysql://jzm6ee-mysql.mysql.database.azure.com:3306/chinook",
  dbtable "invoiceline",
  user "ehickey",
  password "Greenriver1752!"
)/*specifys url, database table, my username, and password, for my sql azure data base server*/

In [0]:
%sql
USE DATABASE chinook_dlh;/*using chinook_dlh database*/

CREATE TABLE IF NOT EXISTS chinook_dlh.dim_invoice_line/*creating invoice line dimension table in chinook data lakehouse*/
COMMENT "Invoice Line Dimension Table" /*commenting table name*/
LOCATION "dbfs:/FileStore/final_project_data/chinook_dlh/dim_invoice_line" /*specifying location for the file*/
AS SELECT * FROM view_invoice_line /*selecting the data from view invoice line*/

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM chinook_dlh.dim_invoice_line LIMIT 5 /* proving that the data is in table*/

InvoiceLineId,InvoiceId,TrackId,UnitPrice,Quantity
1,1,2,0.99,1
2,1,4,0.99,1
3,2,6,0.99,1
4,2,8,0.99,1
5,2,10,0.99,1


In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_invoice_line; /*describing data types in table*/

col_name,data_type,comment
InvoiceLineId,int,
InvoiceId,int,
TrackId,int,
UnitPrice,"decimal(10,2)",
Quantity,int,
,,
# Delta Statistics Columns,,
Column Names,"Quantity, UnitPrice, InvoiceId, TrackId, InvoiceLineId",
Column Selection Method,first-32,
,,


##### Creating a new table that sources its data from a table in an azure MySQL database

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date/*Creating temporary view called view_date that gets data from mySQL server and brings to databricks cluster*/
USING org.apache.spark.sql.jdbc /*specifying data source type*/
OPTIONS (
  url "jdbc:mysql://jzm6ee-mysql.mysql.database.azure.com:3306/music_data_mart",
  dbtable "dim_date",
  user "ehickey",
  password "Greenriver1752!"
)/*specifys url, database table, my username, and password, for my sql azure data base server*/

In [0]:
%sql
USE DATABASE chinook_dlh;/*using chinook_dlh database*/

CREATE TABLE IF NOT EXISTS chinook_dlh.dim_date/*creating date dimension table in chinook data lakehouse*/
COMMENT "Date Dimension Table"/*commenting table name*/
LOCATION "dbfs:/FileStore/final_project_data/chinook_dlh/dim_date"
AS SELECT * FROM view_date/*selecting the data from view date*/

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM chinook_dlh.dim_date LIMIT 5 /* proving that the data is in table*/

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_date; /*describing data types in table*/

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


#### Fetching reference data from a MongoDB atlas databse
##### Viewing the data files on the databricks file system 

In [0]:
display(dbutils.fs.ls(batch_dir))#showing the files in the directory and info about them

path,name,size,modificationTime
dbfs:/FileStore/final_project_data/sales/batch2/chinookalbumtable.json,chinookalbumtable.json,28790,1732658600000
dbfs:/FileStore/final_project_data/sales/batch2/chinookartisttable.csv,chinookartisttable.csv,7434,1732658601000
dbfs:/FileStore/final_project_data/sales/batch2/chinookcustomertable.json,chinookcustomertable.json,20081,1732658599000
dbfs:/FileStore/final_project_data/sales/batch2/chinookemployeetable.csv,chinookemployeetable.csv,1638,1732658600000
dbfs:/FileStore/final_project_data/sales/batch2/chinookgenretable.json,chinookgenretable.json,1193,1732658601000
dbfs:/FileStore/final_project_data/sales/batch2/chinookinvoicelinetable.json,chinookinvoicelinetable.json,110939,1732658600000
dbfs:/FileStore/final_project_data/sales/batch2/chinookinvoicetable.csv,chinookinvoicetable.csv,34328,1732658600000
dbfs:/FileStore/final_project_data/sales/batch2/chinookmediatypetable.csv,chinookmediatypetable.csv,146,1732658600000
dbfs:/FileStore/final_project_data/sales/batch2/chinookplaylisttable.json,chinookplaylisttable.json,969,1732658601000
dbfs:/FileStore/final_project_data/sales/batch2/chinookplaylisttracktable.csv,chinookplaylisttracktable.csv,5912,1732658599000


##### Creating a New MongoDB Database, and Loading JSON Data Into a New MongoDB Collection


In [0]:
source_dir = '/dbfs/FileStore/final_project_data/sales/batch2'#specifying directory to read files from 
json_files = {"customer" : 'chinookcustomertable.json'} ##specifying the customer table json file with key

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files)# using function and arguments for atlas info to read file from my source directory and adding it to the MongoDB collection 

<pymongo.results.InsertManyResult at 0x7f5c5852e440>

##### Fetching Data from the New MongoDB Collection

In [0]:
%scala 
/* Using scala language to create new variables to hold the username, password, clustername, and the altas uri*/
import com.mongodb.spark._ 

val userName = "emmahickey1752"
val pwd = "p.P43rrC-srbub9"
val clusterName = "Cluster0.kpk6z"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"



In [0]:
%scala
/*Getting info from customer collection and then using .select to get the columns in my prefered order*/
import com.mongodb.spark._ 

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "Chinook_purchasing")
.option("collection", "customer")
.option("uri", atlas_uri).load()
.select("CustomerId","Company","LastName","FirstName","Phone","Address","City","Fax","PostalCode","Email")
display(df_customer)





CustomerId,Company,LastName,FirstName,Phone,Address,City,Fax,PostalCode,Email
1,Embraer - Empresa Brasileira de Aeronáutica S.A.,Gonçalves,Luís,+55 (12) 3923-5555,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,+55 (12) 3923-5566,12227-000,luisg@embraer.com.br
2,,Köhler,Leonie,+49 0711 2842222,Theodor-Heuss-Straße 34,Stuttgart,,70174,leonekohler@surfeu.de
3,,Tremblay,François,+1 (514) 721-4711,1498 rue Bélanger,Montréal,,H2G 1A7,ftremblay@gmail.com
4,,Hansen,Bjørn,+47 22 44 22 22,Ullevålsveien 14,Oslo,,0171,bjorn.hansen@yahoo.no
5,JetBrains s.r.o.,Wichterlová,František,+420 2 4172 5555,Klanova 9/506,Prague,+420 2 4172 5555,14700,frantisekw@jetbrains.com
6,,Holý,Helena,+420 2 4177 0449,Rilská 3174/6,Prague,,14300,hholy@gmail.com
7,,Gruber,Astrid,+43 01 5134505,"Rotenturmstraße 4, 1010 Innere Stadt",Vienne,,1010,astrid.gruber@apple.at
8,,Peeters,Daan,+32 02 219 03 03,Grétrystraat 63,Brussels,,1000,daan_peeters@apple.be
9,,Nielsen,Kara,+453 3331 9991,Sønder Boulevard 51,Copenhagen,,1720,kara.nielsen@jubii.dk
10,Woodstock Discos,Martins,Eduardo,+55 (11) 3033-5446,"Rua Dr. Falcão Filho, 155",São Paulo,+55 (11) 3033-4564,01007-010,eduardo@woodstock.com.br


In [0]:
%scala
df_customer.printSchema() /*showing dataframe structure*/

##### Using spark dataframe to Create a new table in the databricks metadata database called chinook_dlh

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("chinook_dlh.dim_customer") /* writing customer dataframe to a delta table and saving it as the customer dimension in chinook data lakehouse*/

In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_customer /*describing data types in customer dimension table*/

col_name,data_type,comment
CustomerId,int,
Company,string,
LastName,string,
FirstName,string,
Phone,string,
Address,string,
City,string,
Fax,string,
PostalCode,string,
Email,string,


##### Querying new table in the databricks metadata database 

In [0]:
%sql
SELECT * FROM chinook_dlh.dim_customer LIMIT 5 /*querying table to prove I have data*/

CustomerId,Company,LastName,FirstName,Phone,Address,City,Fax,PostalCode,Email
1,Embraer - Empresa Brasileira de Aeronáutica S.A.,Gonçalves,Luís,+55 (12) 3923-5555,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,+55 (12) 3923-5566,12227-000,luisg@embraer.com.br
2,,Köhler,Leonie,+49 0711 2842222,Theodor-Heuss-Straße 34,Stuttgart,,70174,leonekohler@surfeu.de
3,,Tremblay,François,+1 (514) 721-4711,1498 rue Bélanger,Montréal,,H2G 1A7,ftremblay@gmail.com
4,,Hansen,Bjørn,+47 22 44 22 22,Ullevålsveien 14,Oslo,,0171,bjorn.hansen@yahoo.no
5,JetBrains s.r.o.,Wichterlová,František,+420 2 4172 5555,Klanova 9/506,Prague,+420 2 4172 5555,14700,frantisekw@jetbrains.com


#### Fetching the data from file system 
##### Using PySpark to read from csv file

In [0]:
employee_csv = f"{batch_dir}/chinookemployeetable.csv" #specifying the employee table csv file in the batch directory

df_employee = spark.read.format('csv').options(header='true', inferSchema='true').load(employee_csv) # reading data and specifying the csv format , and to infer data types and then passing it a reference to file
display(df_employee) # showing employee dataframe


EmployeeId,LastName,FirstName,Title,ReportsTo,BirthDate,HireDate,Address,City,State,Country,PostalCode,Phone,Fax,Email
1,Adams,Andrew,General Manager,,1962-02-18T00:00:00Z,2002-08-14T00:00:00Z,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com
2,Edwards,Nancy,Sales Manager,1.0,1958-12-08T00:00:00Z,2002-05-01T00:00:00Z,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com
3,Peacock,Jane,Sales Support Agent,2.0,1973-08-29T00:00:00Z,2002-04-01T00:00:00Z,1111 6 Ave SW,Calgary,AB,Canada,T2P 5M5,+1 (403) 262-3443,+1 (403) 262-6712,jane@chinookcorp.com
4,Park,Margaret,Sales Support Agent,2.0,1947-09-19T00:00:00Z,2003-05-03T00:00:00Z,683 10 Street SW,Calgary,AB,Canada,T2P 5G3,+1 (403) 263-4423,+1 (403) 263-4289,margaret@chinookcorp.com
5,Johnson,Steve,Sales Support Agent,2.0,1965-03-03T00:00:00Z,2003-10-17T00:00:00Z,7727B 41 Ave,Calgary,AB,Canada,T3B 1Y7,1 (780) 836-9987,1 (780) 836-9543,steve@chinookcorp.com
6,Mitchell,Michael,IT Manager,1.0,1973-07-01T00:00:00Z,2003-10-17T00:00:00Z,5827 Bowness Road NW,Calgary,AB,Canada,T3B 0C5,+1 (403) 246-9887,+1 (403) 246-9899,michael@chinookcorp.com
7,King,Robert,IT Staff,6.0,1970-05-29T00:00:00Z,2004-01-02T00:00:00Z,590 Columbia Boulevard West,Lethbridge,AB,Canada,T1K 5N8,+1 (403) 456-9986,+1 (403) 456-8485,robert@chinookcorp.com
8,Callahan,Laura,IT Staff,6.0,1968-01-09T00:00:00Z,2004-03-04T00:00:00Z,923 7 ST NW,Lethbridge,AB,Canada,T1H 1Y8,+1 (403) 467-3351,+1 (403) 467-8772,laura@chinookcorp.com


In [0]:
df_employee.printSchema() #showing dataframe structure

root
 |-- EmployeeId: integer (nullable = true)
 |-- LastName: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- ReportsTo: string (nullable = true)
 |-- BirthDate: timestamp (nullable = true)
 |-- HireDate: timestamp (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- PostalCode: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- Fax: string (nullable = true)
 |-- Email: string (nullable = true)



In [0]:
df_employee.write.format("delta").mode("overwrite").saveAsTable("chinook_dlh.dim_employee") #writing the employee dataframe to d a delta table and saving it as the emplyee dimension in chinook data lakehouse

In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_employee; /*describing data types in employee dimension table */

col_name,data_type,comment
EmployeeId,int,
LastName,string,
FirstName,string,
Title,string,
ReportsTo,string,
BirthDate,timestamp,
HireDate,timestamp,
Address,string,
City,string,
State,string,


In [0]:
%sql
SELECT * FROM chinook_dlh.dim_employee LIMIT 5; /*proving I can query rows from table*/

EmployeeId,LastName,FirstName,Title,ReportsTo,BirthDate,HireDate,Address,City,State,Country,PostalCode,Phone,Fax,Email
1,Adams,Andrew,General Manager,,1962-02-18T00:00:00Z,2002-08-14T00:00:00Z,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com
2,Edwards,Nancy,Sales Manager,1.0,1958-12-08T00:00:00Z,2002-05-01T00:00:00Z,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com
3,Peacock,Jane,Sales Support Agent,2.0,1973-08-29T00:00:00Z,2002-04-01T00:00:00Z,1111 6 Ave SW,Calgary,AB,Canada,T2P 5M5,+1 (403) 262-3443,+1 (403) 262-6712,jane@chinookcorp.com
4,Park,Margaret,Sales Support Agent,2.0,1947-09-19T00:00:00Z,2003-05-03T00:00:00Z,683 10 Street SW,Calgary,AB,Canada,T2P 5G3,+1 (403) 263-4423,+1 (403) 263-4289,margaret@chinookcorp.com
5,Johnson,Steve,Sales Support Agent,2.0,1965-03-03T00:00:00Z,2003-10-17T00:00:00Z,7727B 41 Ave,Calgary,AB,Canada,T3B 1Y7,1 (780) 836-9987,1 (780) 836-9543,steve@chinookcorp.com


##### Verifying dimension tables 

In [0]:
%sql
USE chinook_dlh; /*using the chinnok data lakehouse and showing tables in it*/
SHOW TABLES

database,tableName,isTemporary
chinook_dlh,dim_album,False
chinook_dlh,dim_customer,False
chinook_dlh,dim_date,False
chinook_dlh,dim_employee,False
chinook_dlh,dim_invoice_line,False
,_sqldf,True
,view_album,True
,view_date,True
,view_invoice_line,True


### Integrating reference data with real-time data
#### Using AutoLoader to process streaming(hot path) data
##### Bronze Table: Process 'Raw' JSON Data

In [0]:

(spark.readStream
 .format("cloudFiles")# .format cloudfiles means its autoloader so that files are ingested as they hit the directory
 .option("cloudFiles.format", "json") # saying they are json files with the cloudfiles.format option
 #using the .option to identify what schema needs to be
 .option("cloudFiles.schemaHints", "fact_music_sale_key BIGINT")
 .option("cloudFiles.schemaHints", "InvoiceId BIGINT")
 .option("cloudFiles.schemaHints", "CustomerId BIGINT")
 .option("cloudFiles.schemaHints", "InvoiceKey BIGINT") 
 .option("cloudFiles.schemaHints", "InvoiceLineId BIGINT")
 .option("cloudFiles.schemaHints", "CustomerKey BIGINT")
 .option("cloudFiles.schemaHints", "InvoiceDateKey BIGINT")
 .option("cloudFiles.schemaHints", "TrackId BIGINT")
 .option("cloudFiles.schemaHints", "BillingPostalCode STRING")
 .option("cloudFiles.schemaHints", "BillingCity STRING")
 .option("cloudFiles.schemaHints", "BillingCountry STRING")
 .option("cloudFiles.schemaHints", "Total FLOAT")
 .option("cloudFiles.schemaHints", "UnitPrice FLOAT")
 .option("cloudFiles.schemaHints", "Quantity INT")
 .option("cloudFiles.schemaLocation", output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")#format for data is multiline, there is a new json document on each line in that json file, so must specify multiline is true
 .load(music_sales_stream_dir)#load music sales stream directory data
 .createOrReplaceTempView("music_sales_raw_tempview")) #create or replace temprorary view, getting data but not really doing anything to it



In [0]:
%sql
/* Adding Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW music_sales_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM music_sales_raw_tempview
)
/*creating temporary view and adding the metadata, but is streaming not running quite yet*/

In [0]:
%sql
SELECT * FROM music_sales_bronze_tempview /*displaying data in the music sales bronze tempview*/

BillingAddress,BillingCity,BillingCountry,BillingPostalCode,CustomerId,CustomerKey,InvoiceDateKey,InvoiceId,InvoiceKey,InvoiceLineId,Quantity,Total,TrackId,UnitPrice,fact_music_sale_key,row_num,total_count,_rescued_data,receipt_time,source_file
700 W Pender Street,Vancouver,Canada,V6C 1G8,15,15,20102443,276,276,1494,1,5.94,2078,0.99,1494,1494,2240,,2024-12-06T04:37:30.742Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1495,1,8.91,2084,0.99,1495,1495,2240,,2024-12-06T04:37:30.742Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1496,1,8.91,2090,0.99,1496,1496,2240,,2024-12-06T04:37:30.742Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1497,1,8.91,2096,0.99,1497,1497,2240,,2024-12-06T04:37:30.742Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1498,1,8.91,2102,0.99,1498,1498,2240,,2024-12-06T04:37:30.742Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1499,1,8.91,2108,0.99,1499,1499,2240,,2024-12-06T04:37:30.742Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1500,1,8.91,2114,0.99,1500,1500,2240,,2024-12-06T04:37:30.742Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1501,1,8.91,2120,0.99,1501,1501,2240,,2024-12-06T04:37:30.742Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1502,1,8.91,2126,0.99,1502,1502,2240,,2024-12-06T04:37:30.742Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1503,1,8.91,2132,0.99,1503,1503,2240,,2024-12-06T04:37:30.742Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json


In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW music_sales_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM music_sales_raw_tempview
)
/*creating orders bronze tmepview and adding metadata*/

In [0]:
%sql
SELECT * FROM music_sales_bronze_tempview /*diplaying data from music sales bronze tempview*/

BillingAddress,BillingCity,BillingCountry,BillingPostalCode,CustomerId,CustomerKey,InvoiceDateKey,InvoiceId,InvoiceKey,InvoiceLineId,Quantity,Total,TrackId,UnitPrice,fact_music_sale_key,row_num,total_count,_rescued_data,receipt_time,source_file
700 W Pender Street,Vancouver,Canada,V6C 1G8,15,15,20102443,276,276,1494,1,5.94,2078,0.99,1494,1494,2240,,2024-12-06T04:38:49.897Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1495,1,8.91,2084,0.99,1495,1495,2240,,2024-12-06T04:38:49.897Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1496,1,8.91,2090,0.99,1496,1496,2240,,2024-12-06T04:38:49.897Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1497,1,8.91,2096,0.99,1497,1497,2240,,2024-12-06T04:38:49.897Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1498,1,8.91,2102,0.99,1498,1498,2240,,2024-12-06T04:38:49.897Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1499,1,8.91,2108,0.99,1499,1499,2240,,2024-12-06T04:38:49.897Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1500,1,8.91,2114,0.99,1500,1500,2240,,2024-12-06T04:38:49.897Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1501,1,8.91,2120,0.99,1501,1501,2240,,2024-12-06T04:38:49.897Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1502,1,8.91,2126,0.99,1502,1502,2240,,2024-12-06T04:38:49.897Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json
801 W 4th Street,Reno,USA,89503,21,21,20102446,277,277,1503,1,8.91,2132,0.99,1503,1503,2240,,2024-12-06T04:38:49.897Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_3.json


In [0]:
(spark.table("music_sales_bronze_tempview")
      .writeStream #writing it to a delta table,reading from music sales bronze tempview, and runs until interruption
      .format("delta")#create delta table and use specfic chekpoint location below
      .option("checkpointLocation", f"{output_bronze}/_checkpoint")
      .outputMode("append")#loads data into stream and then append new rows to table and call that fact music sales bronze below
      .table("fact_music_sales_bronze"))


<pyspark.sql.streaming.query.StreamingQuery at 0x7f5c786c6450>

##### Silver Table: Including the reference data

In [0]:
(spark.readStream
  .table("fact_music_sales_bronze")
  .createOrReplaceTempView("music_sales_silver_tempview"))
  ##create new tempview

In [0]:
%sql
SELECT * FROM music_sales_silver_tempview
/*proving I have data in music sales table*/

BillingAddress,BillingCity,BillingCountry,BillingPostalCode,CustomerId,CustomerKey,InvoiceDateKey,InvoiceId,InvoiceKey,InvoiceLineId,Quantity,Total,TrackId,UnitPrice,fact_music_sale_key,row_num,total_count,_rescued_data,receipt_time,source_file
Theodor-Heuss-Straße 34,Stuttgart,Germany,70174,2,2,20101232,1,1,1,1,1.98,2,0.99,1,1,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
Theodor-Heuss-Straße 34,Stuttgart,Germany,70174,2,2,20101232,1,1,2,1,1.98,4,0.99,2,2,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
Ullevålsveien 14,Oslo,Norway,0171,4,4,20101233,2,2,3,1,3.96,6,0.99,3,3,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
Ullevålsveien 14,Oslo,Norway,0171,4,4,20101233,2,2,4,1,3.96,8,0.99,4,4,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
Ullevålsveien 14,Oslo,Norway,0171,4,4,20101233,2,2,5,1,3.96,10,0.99,5,5,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
Ullevålsveien 14,Oslo,Norway,0171,4,4,20101233,2,2,6,1,3.96,12,0.99,6,6,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
Grétrystraat 63,Brussels,Belgium,1000,8,8,20101234,3,3,7,1,5.94,16,0.99,7,7,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
Grétrystraat 63,Brussels,Belgium,1000,8,8,20101234,3,3,8,1,5.94,20,0.99,8,8,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
Grétrystraat 63,Brussels,Belgium,1000,8,8,20101234,3,3,9,1,5.94,24,0.99,9,9,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
Grétrystraat 63,Brussels,Belgium,1000,8,8,20101234,3,3,10,1,5.94,28,0.99,10,10,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json


In [0]:
%sql
DESCRIBE EXTENDED music_sales_silver_tempview /*describing data types in table*/

col_name,data_type,comment
BillingAddress,string,
BillingCity,string,
BillingCountry,string,
BillingPostalCode,string,
CustomerId,bigint,
CustomerKey,bigint,
InvoiceDateKey,bigint,
InvoiceId,bigint,
InvoiceKey,bigint,
InvoiceLineId,bigint,


In [0]:
%sql
/*Creating tempview for fact music sales silver*/
CREATE OR REPLACE TEMPORARY VIEW fact_music_sales_silver_tempview AS (
SELECT 
    ms.fact_music_sale_key, /*in order to join listing desired columns with aliases for clarity */
    ms.InvoiceKey,
    ms.InvoiceId,
    ms.CustomerKey,
    ms.CustomerId,
    c.FirstName,
    c.LastName,
    ms.InvoiceDateKey,
    ms.TrackId, 
    ms.UnitPrice,
    ms.Quantity,
    ms.Total,
    ms.BillingAddress,
    ms.BillingCity,
    ms.BillingCountry,
    ms.BillingPostalCode,
    ms.row_num,
    ms.total_count,
    ms._rescued_data,
    ms.receipt_time,
    ms.source_file
FROM music_sales_silver_tempview AS ms /*joining music sales silver tempview*/
INNER JOIN chinook_dlh.dim_customer AS c /* joining customer dimension based on customer id column*/
ON ms.CustomerId = c.CustomerId
LEFT OUTER JOIN chinook_dlh.dim_invoice_line AS il /*joining invoice line dimension based on track id column */
ON ms.TrackId = il.TrackId
LEFT OUTER JOIN chinook_dlh.dim_date AS idk /*joining data dimension on the date key column*/
ON ms.InvoiceDateKey = idk.date_key
);


In [0]:
# code for writing data from fact music sales silver tempview into a delta table
(spark.table("fact_music_sales_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_silver}/_checkpoint") # saving metadata
      .outputMode("append") #adding rows from the stream to table
      .table("fact_music_sales_silver")) #writing it into the table

<pyspark.sql.streaming.query.StreamingQuery at 0x7f5c786b7710>

In [0]:
%sql
SELECT * FROM fact_music_sales_silver /*showing data in fact music sales silver table*/

fact_music_sale_key,InvoiceKey,InvoiceId,CustomerKey,CustomerId,FirstName,LastName,InvoiceDateKey,TrackId,UnitPrice,Quantity,Total,BillingAddress,BillingCity,BillingCountry,BillingPostalCode,row_num,total_count,_rescued_data,receipt_time,source_file
1,1,1,2,2,Leonie,Köhler,20101232,2,0.99,1,1.98,Theodor-Heuss-Straße 34,Stuttgart,Germany,70174,1,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
2,1,1,2,2,Leonie,Köhler,20101232,4,0.99,1,1.98,Theodor-Heuss-Straße 34,Stuttgart,Germany,70174,2,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
3,2,2,4,4,Bjørn,Hansen,20101233,6,0.99,1,3.96,Ullevålsveien 14,Oslo,Norway,0171,3,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
4,2,2,4,4,Bjørn,Hansen,20101233,8,0.99,1,3.96,Ullevålsveien 14,Oslo,Norway,0171,4,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
5,2,2,4,4,Bjørn,Hansen,20101233,10,0.99,1,3.96,Ullevålsveien 14,Oslo,Norway,0171,5,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
6,2,2,4,4,Bjørn,Hansen,20101233,12,0.99,1,3.96,Ullevålsveien 14,Oslo,Norway,0171,6,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
7,3,3,8,8,Daan,Peeters,20101234,16,0.99,1,5.94,Grétrystraat 63,Brussels,Belgium,1000,7,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
8,3,3,8,8,Daan,Peeters,20101234,20,0.99,1,5.94,Grétrystraat 63,Brussels,Belgium,1000,8,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
9,3,3,8,8,Daan,Peeters,20101234,24,0.99,1,5.94,Grétrystraat 63,Brussels,Belgium,1000,9,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json
10,3,3,8,8,Daan,Peeters,20101234,28,0.99,1,5.94,Grétrystraat 63,Brussels,Belgium,1000,10,2240,,2024-12-06T04:39:30.44Z,dbfs:/FileStore/final_project_data/sales/stream2/music_sales/fact_music_sales_1.json


In [0]:
%sql
DESCRIBE EXTENDED chinook_dlh.fact_music_sales_silver /*describing data types in fact music sales silver table*/

col_name,data_type,comment
fact_music_sale_key,bigint,
InvoiceKey,bigint,
InvoiceId,bigint,
CustomerKey,bigint,
CustomerId,bigint,
FirstName,string,
LastName,string,
InvoiceDateKey,bigint,
TrackId,bigint,
UnitPrice,double,


##### Gold table: Performing Aggregations 

In [0]:
%sql
/*Creating a table to put this data returned from aggregation*/
CREATE OR REPLACE TABLE chinook_dlh.fact_quantities_by_customer_gold AS  (
SELECT 
    LastName AS CustomerLastName, 
    ms.UnitPrice, /*Including unit price for transparency on what customer paid per unit purchased*/
    SUM(ms.Quantity) AS TotalQuantity,
    SUM(ms.Quantity * ms.UnitPrice) AS TotalCost
/*finding the total quantity and the total cost for each customer through SUM and using aliases for the column names for clarity*/
FROM chinook_dlh.fact_music_sales_silver AS ms /*specifying where data is coming from*/
GROUP BY LastName,ms.UnitPrice/*groups by last name so that returned total quanity and cost will be aggregated for each of the customers in the table*/
ORDER BY TotalCost DESC);/*ordering from highest to lowest cost*/

SELECT * FROM chinook_dlh.fact_quantities_by_customer_gold;
/* displaying the table and when selecting invoking the stream*/

CustomerLastName,UnitPrice,TotalQuantity,TotalCost
Peeters,0.99,50,49.49999999999998
Brooks,0.99,49,48.50999999999999
Gutiérrez,0.99,49,48.509999999999984
Sampaio,0.99,49,48.509999999999984
Jones,0.99,48,47.51999999999998
Brown,0.99,48,47.51999999999998
Sullivan,0.99,48,47.51999999999998
Almeida,0.99,48,47.51999999999998
Philips,0.99,48,47.51999999999998
Schneider,0.99,48,47.51999999999998


#### Cleaning the File System and removing final project data directory and files within

In [0]:
%fs rm -r /FileStore/final_project-data/ 

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:464)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:571)
	at com.data