## Chinook Music Store Streaming Analytics Project - DS 2002
### Overview
This project implements a real-time streaming analytics pipeline using Databricks Auto Loader to process and analyze digital music sales data. Building upon the Chinook data warehouse foundation, I've developed a Bronze-Silver-Gold architecture that enables real-time analysis of customer purchasing patterns, track popularity, and sales performance across multiple dimensions.

### Data Architecture
The solution uses a modern lakehouse architecture with three distinct layers:
* ***Bronze Layer***: Captures raw JSON sales data using Auto Loader, providing exactly-once processing guarantees and efficient handling of new files
* ***Silver Layer***: Enriches sales data by joining with dimension tables (customers, products, employees, dates) to create a comprehensive analytical view
* ***Gold Layer*** : Aggregates data into business-focused metrics for analyzing sales patterns, customer behavior, and product performance

### Streaming Implementation
The pipeline leverages Databricks Auto Loader to process sales data files as they arrive in cloud storage. The implementation includes:
* Automated schema inference and evolution handling
* Fault-tolerant processing with checkpoint management
* Real-time enrichment with dimension tables
* Aggregation of key business metrics in the gold layer

### Analysis Capabilities
The streaming architecture enables real-time analytics across multiple dimensions:
* Track popularity and genre performance trends
* Customer purchasing patterns by region and time
* Support representative effectiveness
* Revenue analysis by artist and album
* Temporal sales patterns and seasonality

This modern streaming approach extends the original data warehouse capabilities by providing near real-time insights into the music store's operations, enabling more timely and data-driven business decisions.

In [None]:
import os
import json
import pymongo
import pyspark.pandas as pd  
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [None]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ds2002-mysql-paq6ha.mysql.database.azure.com"
jdbc_port = 3306
src_database = "chinook_dw"

connection_properties = {
  "user" : "MustafaLo5253",
  "password" : "Datascience2002",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster-ds-2002.kvu5a"
atlas_database_name = "chinook_dw"
atlas_user_name = "mustafalonandwala750"
atlas_password = "MongodbDS2002"

# Data Files (JSON) Information ###############################
dst_database = "chinook_dlh"

base_dir = "dbfs:/FileStore/project2-lab-data"
database_dir = f"{base_dir}/{dst_database}"

batch_dir = f"{base_dir}/batch"
stream_dir = f"{base_dir}/stream"

sales_stream_dir = f"{stream_dir}/sales"

sales_output_bronze = f"{database_dir}/fact_sales/bronze"
sales_output_silver = f"{database_dir}/fact_sales/silver"
sales_output_gold = f"{database_dir}/fact_sales/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_sales", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)


True

#### 3.0. Define Global Functions

In [None]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data
#### 1.0. Fetch Reference Data From an Azure MySQL Database
#### 1.1. Create a New Databricks Metadata Database.

In [None]:
%sql
DROP DATABASE IF EXISTS chinook_dlh CASCADE;

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS chinook_dlh
COMMENT "DS-2002 Lab 06 Database"
LOCATION "dbfs:/FileStore/project2-lab-data/chinook_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Project 2");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [None]:
%sql
-- Create a temporary view to extract date dimension from Azure MySQL
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-paq6ha.mysql.database.azure.com:3306/chinook_dw",
  dbtable "dim_date",
  user "MustafaLo5253",
  password "Datascience2002"
)

In [None]:
%sql
-- Use the Chinook data lakehouse database
USE DATABASE chinook_dlh;

-- Create the date dimension table in the lakehouse
CREATE OR REPLACE TABLE chinook_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/project2-lab-data/chinook_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [None]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [None]:
%sql
SELECT * FROM chinook_dlh.dim_date LIMIT 5;

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Employee Dimension Data from an Azure MySQL database.

In [None]:
%sql
-- Create a temporary view to extract date dimension from Azure MySQL
CREATE OR REPLACE TEMPORARY VIEW view_employee
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-paq6ha.mysql.database.azure.com:3306/chinook_dw",
  dbtable "dim_employee",
  user "MustafaLo5253",
  password "Datascience2002"
)

In [None]:
%sql
-- Use the Chinook data lakehouse database
USE DATABASE chinook_dlh;

-- Create the employee dimension table in the lakehouse
CREATE OR REPLACE TABLE chinook_dlh.dim_employee
COMMENT "Employee Dimension Table"
LOCATION "dbfs:/FileStore/project2-lab-data/chinook_dlh/dim_employee"
AS SELECT * FROM view_employee

num_affected_rows,num_inserted_rows


In [None]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_employee;

col_name,data_type,comment
EmployeeKey,bigint,
EmployeeId,bigint,
LastName,varchar(65535),
FirstName,varchar(65535),
Title,varchar(65535),
ReportsTo,double,
BirthDate,timestamp,
HireDate,timestamp,
Address,varchar(65535),
City,varchar(65535),


In [None]:
%sql
SELECT * FROM chinook_dlh.dim_employee LIMIT 5

EmployeeKey,EmployeeId,LastName,FirstName,Title,ReportsTo,BirthDate,HireDate,Address,City,State,Country,PostalCode,Phone,Fax,Email,ReportsToName
1,1,Adams,Andrew,General Manager,,1962-02-18T00:00:00Z,2002-08-14T00:00:00Z,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com,No Manager
2,2,Edwards,Nancy,Sales Manager,1.0,1958-12-08T00:00:00Z,2002-05-01T00:00:00Z,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com,Andrew Adams
3,3,Peacock,Jane,Sales Support Agent,2.0,1973-08-29T00:00:00Z,2002-04-01T00:00:00Z,1111 6 Ave SW,Calgary,AB,Canada,T2P 5M5,+1 (403) 262-3443,+1 (403) 262-6712,jane@chinookcorp.com,Nancy Edwards
4,4,Park,Margaret,Sales Support Agent,2.0,1947-09-19T00:00:00Z,2003-05-03T00:00:00Z,683 10 Street SW,Calgary,AB,Canada,T2P 5G3,+1 (403) 263-4423,+1 (403) 263-4289,margaret@chinookcorp.com,Nancy Edwards
5,5,Johnson,Steve,Sales Support Agent,2.0,1965-03-03T00:00:00Z,2003-10-17T00:00:00Z,7727B 41 Ave,Calgary,AB,Canada,T3B 1Y7,1 (780) 836-9987,1 (780) 836-9543,steve@chinookcorp.com,Nancy Edwards


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [None]:
# View the files in your batch directory
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/project2-lab-data/batch'

path,name,size,modificationTime
dbfs:/FileStore/project2-lab-data/batch/Chinook_DimCustomer.json,Chinook_DimCustomer.json,26847,1733441261000
dbfs:/FileStore/project2-lab-data/batch/Chinook_DimProduct.csv,Chinook_DimProduct.csv,127202,1733441261000


In [None]:
# Define source directory and JSON files for MongoDB
source_dir = '/dbfs/FileStore/project2-lab-data/batch'
json_files = {
    "customers": 'Chinook_DimCustomer.json'
}

# Load the JSON files into MongoDB collections
set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files)

<pymongo.results.InsertManyResult at 0x7f52e4eaffc0>

###### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [None]:
%scala
// Set up MongoDB connection in Scala
import com.mongodb.spark._

val userName = "mustafalonandwala750"
val pwd = "MongodbDS2002"
val clusterName = "cluster-ds-2002.kvu5a"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [None]:
%scala
// Create DataFrame from MongoDB customer collection
val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "chinook_dw")
.option("collection", "customers")
.load()
.select("CustomerId", "FirstName", "LastName", "Company", "Address", "City", "State", "Country", "PostalCode", "Phone", "Email", "SupportRepFirstName", "SupportRepLastName", "TotalPurchases", "TotalSpent")

display(df_customer.head(5))

CustomerId,FirstName,LastName,Company,Address,City,State,Country,PostalCode,Phone,Email,SupportRepFirstName,SupportRepLastName,TotalPurchases,TotalSpent
1,Luís,Gonçalves,"""Embraer - Empresa Brasileira de Aeronáutica S.A.""","""Av. Brigadeiro Faria Lima, 2170""","""São José dos Campos""","""SP""","""Brazil""","""12227-000""","""+55 (12) 3923-5555""","""luisg@embraer.com.br""","""Jane""","""Peacock""",7,39.62
2,Leonie,Köhler,,"""Theodor-Heuss-Straße 34""","""Stuttgart""",,"""Germany""","""70174""","""+49 0711 2842222""","""leonekohler@surfeu.de""","""Steve""","""Johnson""",7,37.62
3,François,Tremblay,,"""1498 rue Bélanger""","""Montréal""","""QC""","""Canada""","""H2G 1A7""","""+1 (514) 721-4711""","""ftremblay@gmail.com""","""Jane""","""Peacock""",7,39.62
4,Bjørn,Hansen,,"""Ullevålsveien 14""","""Oslo""",,"""Norway""","""0171""","""+47 22 44 22 22""","""bjorn.hansen@yahoo.no""","""Margaret""","""Park""",7,39.62
5,František,Wichterlová,"""JetBrains s.r.o.""","""Klanova 9/506""","""Prague""",,"""Czech Republic""","""14700""","""+420 2 4172 5555""","""frantisekw@jetbrains.com""","""Margaret""","""Park""",7,40.62


In [None]:
%scala
df_customer.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (chinook_dlh)

In [None]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("chinook_dlh.dim_customer")

In [None]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_customer

col_name,data_type,comment
CustomerId,int,
FirstName,string,
LastName,string,
Company,string,
Address,string,
City,string,
State,string,
Country,string,
PostalCode,string,
Phone,string,


In [None]:
%sql
SELECT * FROM chinook_dlh.dim_customer LIMIT 5

CustomerId,FirstName,LastName,Company,Address,City,State,Country,PostalCode,Phone,Email,SupportRepFirstName,SupportRepLastName,TotalPurchases,TotalSpent
1,Luís,Gonçalves,"""Embraer - Empresa Brasileira de Aeronáutica S.A.""","""Av. Brigadeiro Faria Lima, 2170""","""São José dos Campos""","""SP""","""Brazil""","""12227-000""","""+55 (12) 3923-5555""","""luisg@embraer.com.br""","""Jane""","""Peacock""",7,39.62
2,Leonie,Köhler,,"""Theodor-Heuss-Straße 34""","""Stuttgart""",,"""Germany""","""70174""","""+49 0711 2842222""","""leonekohler@surfeu.de""","""Steve""","""Johnson""",7,37.62
3,François,Tremblay,,"""1498 rue Bélanger""","""Montréal""","""QC""","""Canada""","""H2G 1A7""","""+1 (514) 721-4711""","""ftremblay@gmail.com""","""Jane""","""Peacock""",7,39.62
4,Bjørn,Hansen,,"""Ullevålsveien 14""","""Oslo""",,"""Norway""","""0171""","""+47 22 44 22 22""","""bjorn.hansen@yahoo.no""","""Margaret""","""Park""",7,39.62
5,František,Wichterlová,"""JetBrains s.r.o.""","""Klanova 9/506""","""Prague""",,"""Czech Republic""","""14700""","""+420 2 4172 5555""","""frantisekw@jetbrains.com""","""Margaret""","""Park""",7,40.62


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [None]:
# Read Product dimension from CSV file
product_csv = f"{batch_dir}/Chinook_DimProduct.csv"

# Load CSV into DataFrame with header and schema inference
df_product = spark.read.format('csv').options(header='true', inferSchema='true').load(product_csv)
display(df_product.head(5))

TrackId,TrackName,Composer,Milliseconds,Bytes,UnitPrice,AlbumTitle,ArtistName,GenreName,MediaTypeName
1,For Those About To Rock (We Salute You),"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99,For Those About To Rock We Salute You,AC/DC,Rock,MPEG audio file
2,Balls to the Wall,"U. Dirkschneider, W. Hoffmann, H. Frank, P. Baltes, S. Kaufmann, G. Hoffmann",342562,5510424,0.99,Balls to the Wall,Accept,Rock,Protected AAC audio file
3,Fast As a Shark,"F. Baltes, S. Kaufman, U. Dirkscneider & W. Hoffman",230619,3990994,0.99,Restless and Wild,Accept,Rock,Protected AAC audio file
4,Restless and Wild,"F. Baltes, R.A. Smith-Diesel, S. Kaufman, U. Dirkscneider & W. Hoffman",252051,4331779,0.99,Restless and Wild,Accept,Rock,Protected AAC audio file
5,Princess of the Dawn,Deaffy & R.A. Smith-Diesel,375418,6290521,0.99,Restless and Wild,Accept,Rock,Protected AAC audio file


In [None]:
# Show schema of loaded data
df_product.printSchema()

root
 |-- TrackId: integer (nullable = true)
 |-- TrackName: string (nullable = true)
 |-- Composer: string (nullable = true)
 |-- Milliseconds: integer (nullable = true)
 |-- Bytes: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- AlbumTitle: string (nullable = true)
 |-- ArtistName: string (nullable = true)
 |-- GenreName: string (nullable = true)
 |-- MediaTypeName: string (nullable = true)



In [None]:
# Create Delta table from DataFrame
df_product.write.format("delta").mode("overwrite").saveAsTable("chinook_dlh.dim_product")

In [None]:
%sql
DESCRIBE EXTENDED chinook_dlh.dim_product;

col_name,data_type,comment
TrackId,int,
TrackName,string,
Composer,string,
Milliseconds,int,
Bytes,int,
UnitPrice,double,
AlbumTitle,string,
ArtistName,string,
GenreName,string,
MediaTypeName,string,


In [None]:
%sql
SELECT * FROM chinook_dlh.dim_product LIMIT 5;

TrackId,TrackName,Composer,Milliseconds,Bytes,UnitPrice,AlbumTitle,ArtistName,GenreName,MediaTypeName
1,For Those About To Rock (We Salute You),"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99,For Those About To Rock We Salute You,AC/DC,Rock,MPEG audio file
2,Balls to the Wall,"U. Dirkschneider, W. Hoffmann, H. Frank, P. Baltes, S. Kaufmann, G. Hoffmann",342562,5510424,0.99,Balls to the Wall,Accept,Rock,Protected AAC audio file
3,Fast As a Shark,"F. Baltes, S. Kaufman, U. Dirkscneider & W. Hoffman",230619,3990994,0.99,Restless and Wild,Accept,Rock,Protected AAC audio file
4,Restless and Wild,"F. Baltes, R.A. Smith-Diesel, S. Kaufman, U. Dirkscneider & W. Hoffman",252051,4331779,0.99,Restless and Wild,Accept,Rock,Protected AAC audio file
5,Princess of the Dawn,Deaffy & R.A. Smith-Diesel,375418,6290521,0.99,Restless and Wild,Accept,Rock,Protected AAC audio file


##### Verify Dimension Tables

In [None]:
%sql
USE chinook_dlh;
SHOW TABLES

database,tableName,isTemporary
chinook_dlh,dim_customer,False
chinook_dlh,dim_date,False
chinook_dlh,dim_employee,False
chinook_dlh,dim_product,False
,_sqldf,True
,display_query_1,True
,display_query_2,True
,display_query_3,True
,display_query_4,True
,display_query_5,True


%md
### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [None]:
# Use AutoLoader to process streaming sales data into Bronze layer
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", sales_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(sales_stream_dir)
 .createOrReplaceTempView("sales_raw_tempview"))

In [None]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW sales_bronze_tempview AS (
  SELECT *, 
         current_timestamp() receipt_time, 
         input_file_name() source_file
  FROM sales_raw_tempview
)

In [None]:
%sql
SELECT * FROM sales_bronze_tempview LIMIT 5;

CustomerKey,DateKey,Discount,GrossAmount,InvoiceId,InvoiceLineId,NetAmount,ProductKey,Quantity,SalesKey,UnitPrice,_rescued_data,receipt_time,source_file
32,20220517,0,0.99,116,628,0.99,293,1,628,0.99,,2024-12-06T02:44:11.232Z,dbfs:/FileStore/project2-lab-data/stream/sales/sales_batch2.json
32,20220517,0,0.99,116,629,0.99,299,1,629,0.99,,2024-12-06T02:44:11.232Z,dbfs:/FileStore/project2-lab-data/stream/sales/sales_batch2.json
41,20220522,0,0.99,117,630,0.99,308,1,630,0.99,,2024-12-06T02:44:11.232Z,dbfs:/FileStore/project2-lab-data/stream/sales/sales_batch2.json
41,20220522,0,0.99,117,631,0.99,317,1,631,0.99,,2024-12-06T02:44:11.232Z,dbfs:/FileStore/project2-lab-data/stream/sales/sales_batch2.json
41,20220522,0,0.99,117,632,0.99,326,1,632,0.99,,2024-12-06T02:44:11.232Z,dbfs:/FileStore/project2-lab-data/stream/sales/sales_batch2.json


In [None]:
# Write the bronze table to Delta format
(spark.table("sales_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{sales_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_sales_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f52e47a1ed0>

##### 6.2. Silver Table: Include Reference Data

In [None]:
(spark.readStream
  .table("fact_sales_bronze")
  .createOrReplaceTempView("sales_silver_tempview"))

In [None]:
%sql
SELECT * FROM sales_silver_tempview LIMIT 5;

CustomerKey,DateKey,Discount,GrossAmount,InvoiceId,InvoiceLineId,NetAmount,ProductKey,Quantity,SalesKey,UnitPrice,_rescued_data,receipt_time,source_file
2,20210101,0,0.99,1,1,0.99,2,1,1,0.99,,2024-12-06T02:44:52.997Z,dbfs:/FileStore/project2-lab-data/stream/sales/sales_batch1.json
2,20210101,0,0.99,1,2,0.99,4,1,2,0.99,,2024-12-06T02:44:52.997Z,dbfs:/FileStore/project2-lab-data/stream/sales/sales_batch1.json
4,20210102,0,0.99,2,3,0.99,6,1,3,0.99,,2024-12-06T02:44:52.997Z,dbfs:/FileStore/project2-lab-data/stream/sales/sales_batch1.json
4,20210102,0,0.99,2,4,0.99,8,1,4,0.99,,2024-12-06T02:44:52.997Z,dbfs:/FileStore/project2-lab-data/stream/sales/sales_batch1.json
4,20210102,0,0.99,2,5,0.99,10,1,5,0.99,,2024-12-06T02:44:52.997Z,dbfs:/FileStore/project2-lab-data/stream/sales/sales_batch1.json


In [None]:
%sql
DESCRIBE EXTENDED sales_silver_tempview

col_name,data_type,comment
CustomerKey,bigint,
DateKey,bigint,
Discount,bigint,
GrossAmount,double,
InvoiceId,bigint,
InvoiceLineId,bigint,
NetAmount,double,
ProductKey,bigint,
Quantity,bigint,
SalesKey,bigint,


In [None]:
%sql
-- This query creates an enriched silver layer view by joining the streaming sales data with dimension tables:
-- - Customer dimension for customer details and demographics
-- - Product dimension for track and music information
-- - Date dimension for temporal attributes
-- The LEFT JOINs preserve all sales records even if dimension lookups fail
-- The view supports streaming by maintaining the source table's streaming properties

CREATE OR REPLACE TEMPORARY VIEW fact_sales_silver_tempview AS (
  SELECT s.SalesKey,
      s.InvoiceId,
      s.InvoiceLineId,
      s.CustomerKey,
      c.FirstName AS customer_first_name,
      c.LastName AS customer_last_name,
      c.Country AS customer_country,
      s.ProductKey,
      p.TrackName AS track_name,
      p.Composer AS composer,
      p.Milliseconds AS duration,
      p.Bytes AS bytes,
      s.DateKey,
      d.day_name_of_week,
      d.month_name,
      d.calendar_quarter,
      d.calendar_year,
      s.Quantity,
      s.UnitPrice,
      s.GrossAmount,
      s.Discount,
      s.NetAmount
  FROM sales_silver_tempview AS s
  LEFT JOIN chinook_dlh.dim_customer AS c
    ON c.CustomerId = s.CustomerKey
  LEFT JOIN chinook_dlh.dim_product AS p
    ON p.TrackId = s.ProductKey
  LEFT JOIN chinook_dlh.dim_date AS d
    ON d.date_key = s.DateKey
);
SELECT * FROM fact_sales_silver_tempview LIMIT 5;

SalesKey,InvoiceId,InvoiceLineId,CustomerKey,customer_first_name,customer_last_name,customer_country,ProductKey,track_name,composer,duration,bytes,DateKey,day_name_of_week,month_name,calendar_quarter,calendar_year,Quantity,UnitPrice,GrossAmount,Discount,NetAmount
1,1,1,2,Leonie,Köhler,"""Germany""",2,Balls to the Wall,"U. Dirkschneider, W. Hoffmann, H. Frank, P. Baltes, S. Kaufmann, G. Hoffmann",342562,5510424,20210101,Friday,January,1,2021,1,0.99,0.99,0,0.99
2,1,2,2,Leonie,Köhler,"""Germany""",4,Restless and Wild,"F. Baltes, R.A. Smith-Diesel, S. Kaufman, U. Dirkscneider & W. Hoffman",252051,4331779,20210101,Friday,January,1,2021,1,0.99,0.99,0,0.99
3,2,3,4,Bjørn,Hansen,"""Norway""",6,Put The Finger On You,"Angus Young, Malcolm Young, Brian Johnson",205662,6713451,20210102,Saturday,January,1,2021,1,0.99,0.99,0,0.99
4,2,4,4,Bjørn,Hansen,"""Norway""",8,Inject The Venom,"Angus Young, Malcolm Young, Brian Johnson",210834,6852860,20210102,Saturday,January,1,2021,1,0.99,0.99,0,0.99
5,2,5,4,Bjørn,Hansen,"""Norway""",10,Evil Walks,"Angus Young, Malcolm Young, Brian Johnson",263497,8611245,20210102,Saturday,January,1,2021,1,0.99,0.99,0,0.99


In [None]:
# Write streaming data to silver Delta table with schema evolution enabled
(spark.table("fact_sales_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{sales_output_silver}/_checkpoint")
      .option("mergeSchema", "true")
      .outputMode("append")
      .table("fact_sales_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f52e4669390>

In [None]:
%sql
SELECT * FROM fact_sales_silver LIMIT 10;

SalesKey,InvoiceId,InvoiceLineId,CustomerKey,customer_first_name,customer_last_name,customer_country,ProductKey,track_name,composer,duration,bytes,DateKey,day_name_of_week,month_name,calendar_quarter,calendar_year,Quantity,UnitPrice,GrossAmount,Discount,NetAmount
1,1,1,2,Leonie,Köhler,"""Germany""",2,Balls to the Wall,"U. Dirkschneider, W. Hoffmann, H. Frank, P. Baltes, S. Kaufmann, G. Hoffmann",342562,5510424,20210101,Friday,January,1,2021,1,0.99,0.99,0,0.99
2,1,2,2,Leonie,Köhler,"""Germany""",4,Restless and Wild,"F. Baltes, R.A. Smith-Diesel, S. Kaufman, U. Dirkscneider & W. Hoffman",252051,4331779,20210101,Friday,January,1,2021,1,0.99,0.99,0,0.99
3,2,3,4,Bjørn,Hansen,"""Norway""",6,Put The Finger On You,"Angus Young, Malcolm Young, Brian Johnson",205662,6713451,20210102,Saturday,January,1,2021,1,0.99,0.99,0,0.99
4,2,4,4,Bjørn,Hansen,"""Norway""",8,Inject The Venom,"Angus Young, Malcolm Young, Brian Johnson",210834,6852860,20210102,Saturday,January,1,2021,1,0.99,0.99,0,0.99
5,2,5,4,Bjørn,Hansen,"""Norway""",10,Evil Walks,"Angus Young, Malcolm Young, Brian Johnson",263497,8611245,20210102,Saturday,January,1,2021,1,0.99,0.99,0,0.99
6,2,6,4,Bjørn,Hansen,"""Norway""",12,Breaking The Rules,"Angus Young, Malcolm Young, Brian Johnson",263288,8596840,20210102,Saturday,January,1,2021,1,0.99,0.99,0,0.99
7,3,7,8,Daan,Peeters,"""Belgium""",16,Dog Eat Dog,AC/DC,215196,7032162,20210103,Sunday,January,1,2021,1,0.99,0.99,0,0.99
8,3,8,8,Daan,Peeters,"""Belgium""",20,Overdose,AC/DC,369319,12066294,20210103,Sunday,January,1,2021,1,0.99,0.99,0,0.99
9,3,9,8,Daan,Peeters,"""Belgium""",24,Love In An Elevator,"Steven Tyler, Joe Perry",321828,10552051,20210103,Sunday,January,1,2021,1,0.99,0.99,0,0.99
10,3,10,8,Daan,Peeters,"""Belgium""",28,Janie's Got A Gun,"Steven Tyler, Tom Hamilton",330736,10869391,20210103,Sunday,January,1,2021,1,0.99,0.99,0,0.99


In [None]:
%sql
DESCRIBE EXTENDED chinook_dlh.fact_sales_silver

col_name,data_type,comment
SalesKey,bigint,
InvoiceId,bigint,
InvoiceLineId,bigint,
CustomerKey,bigint,
customer_first_name,string,
customer_last_name,string,
customer_country,string,
ProductKey,bigint,
track_name,string,
composer,string,


##### 6.3. Gold Table: Perform Aggregations


In [None]:
%sql
-- This query creates a gold-layer aggregation table that summarizes monthly sales metrics by customer:
-- - Groups sales data by customer and month
-- - Counts total tracks purchased per customer per month
-- - Uses CTAS (Create Table As Select) pattern for materialization
-- - Orders results by highest track count for easy analysis
-- The table enables analysis of customer purchasing patterns and seasonal trends

CREATE OR REPLACE TABLE chinook_dlh.fact_monthly_sales_by_customer_gold AS (
  SELECT CustomerKey AS CustomerID
    , customer_last_name AS LastName
    , customer_first_name AS FirstName
    , month_name AS OrderMonth
    , COUNT(ProductKey) AS TrackCount
  FROM chinook_dlh.fact_sales_silver
  GROUP BY CustomerID, LastName, FirstName, OrderMonth
  ORDER BY TrackCount DESC);

SELECT * FROM chinook_dlh.fact_monthly_sales_by_customer_gold LIMIT 20;

CustomerID,LastName,FirstName,OrderMonth,TrackCount
40,Lefebvre,Dominique,March,14
2,Köhler,Leonie,February,14
19,Goyer,Tim,April,14
3,Tremblay,François,April,14
23,Gordon,John,January,14
22,Leacock,Heather,November,14
39,Bernard,Camille,January,14
18,Brooks,Michelle,February,14
1,Gonçalves,Luís,December,14
38,Schröder,Niklas,October,14


In [None]:
%sql
-- This query creates a gold-layer aggregation table that:
-- - Summarizes total track purchases by customer
-- - Uses a subquery to calculate track counts per customer
-- - Joins back to silver layer for customer details
-- - Orders results by highest track count for customer segmentation analysis

CREATE OR REPLACE TABLE chinook_dlh.fact_track_sales_by_customer_gold AS (
  SELECT pc.CustomerID
    , s.customer_last_name AS CustomerName
    , s.ProductKey AS TrackNumber
    , pc.TrackCount
  FROM chinook_dlh.fact_sales_silver AS s
  INNER JOIN (
    SELECT CustomerKey AS CustomerID
    , COUNT(ProductKey) AS TrackCount
    FROM chinook_dlh.fact_sales_silver
    GROUP BY CustomerKey
  ) AS pc
  ON pc.CustomerID = s.CustomerKey
  ORDER BY TrackCount DESC);

SELECT * FROM chinook_dlh.fact_track_sales_by_customer_gold LIMIT 20;

CustomerID,CustomerName,TrackNumber,TrackCount
2,Köhler,192,26
2,Köhler,194,26
2,Köhler,196,26
2,Köhler,198,26
2,Köhler,2,26
2,Köhler,4,26
2,Köhler,331,26
2,Köhler,340,26
2,Köhler,349,26
2,Köhler,358,26


In [None]:
%sql
-- This query creates a gold-layer aggregation analyzing sales by genre and geographic region:
-- - Groups sales data by music genre and customer country
-- - Calculates unique customer count and total tracks per group
-- - Measures revenue performance across different markets
-- - Orders results by highest revenue for market analysis
CREATE OR REPLACE TABLE chinook_dlh.fact_sales_by_genre_country_gold AS (
  SELECT 
    p.GenreName,
    c.Country,
    COUNT(DISTINCT s.CustomerKey) AS UniqueCustomers,
    COUNT(s.ProductKey) AS TotalTracks,
    SUM(s.NetAmount) AS TotalRevenue
  FROM chinook_dlh.fact_sales_silver AS s
  INNER JOIN chinook_dlh.dim_customer AS c
    ON c.CustomerId = s.CustomerKey
  INNER JOIN chinook_dlh.dim_product AS p
    ON p.TrackId = s.ProductKey
  GROUP BY p.GenreName, c.Country
  ORDER BY TotalRevenue DESC
);
SELECT * FROM chinook_dlh.fact_sales_by_genre_country_gold LIMIT 20;

GenreName,Country,UniqueCustomers,TotalTracks,TotalRevenue
Latin,"""USA""",10,48,47.51999999999999
Rock,"""USA""",10,43,42.57
Rock,"""France""",5,30,29.7
Latin,"""France""",4,26,25.74
Rock,"""Canada""",6,25,24.750000000000004
Rock,"""Germany""",3,24,23.76
Alternative & Punk,"""USA""",7,22,21.78
Latin,"""Brazil""",4,21,20.79
Latin,"""Canada""",6,20,19.8
Rock,"""Brazil""",4,19,18.81


In [None]:
%sql
-- This query creates a gold-layer aggregation analyzing temporal sales patterns:
-- - Groups transactions by year and month
-- - Tracks customer acquisition through unique customer counts
-- - Measures sales volume and revenue trends over time
-- - Calculates average transaction values for trend analysis
CREATE OR REPLACE TABLE chinook_dlh.fact_sales_by_month_gold AS (
  SELECT 
    d.calendar_year AS Year,
    d.month_name AS Month,
    COUNT(DISTINCT s.CustomerKey) AS UniqueCustomers,
    COUNT(s.SalesKey) AS NumberOfTransactions,
    COUNT(s.ProductKey) AS TotalTracksSOld,
    SUM(s.NetAmount) AS TotalRevenue,
    AVG(s.NetAmount) AS AverageTransactionValue
  FROM chinook_dlh.fact_sales_silver AS s
  INNER JOIN chinook_dlh.dim_date AS d
    ON d.date_key = s.DateKey
  GROUP BY Year, Month
  ORDER BY Year, Month
);
SELECT * FROM chinook_dlh.fact_sales_by_month_gold LIMIT 20;

Year,Month,UniqueCustomers,NumberOfTransactions,TotalTracksSOld,TotalRevenue,AverageTransactionValue
2021,April,7,38,38,37.620000000000005,0.99
2021,February,7,38,38,37.620000000000005,0.99
2021,January,6,36,36,35.64,0.99
2021,March,7,38,38,37.62,0.99
2021,May,5,20,20,19.8,0.99
2022,April,4,28,28,27.72,0.99
2022,August,5,23,23,22.770000000000003,0.99
2022,July,7,38,38,37.62,0.99
2022,June,7,38,38,37.620000000000005,0.99
2022,May,7,38,38,37.61999999999999,0.9899999999999998
