### Author: Vishal Kamalakrishnan
### Computing Id: cjq2cw

This project will analyze sales of a mock company. The data for the mock company is present into 3 dimensions dim_store, dim_date, and dim_country. dim_country has been stored in a csv file in DBFS. dim_date has been stored in a Azure Mysql Server. dim_store has been stored in a mongo db database. A fact table will be loaded in containing mock company fact table from my midterm project and this data will be streamed into a bronze and silver table

To run this lab - place the fact_sales1.json, fact_sales2.json, fact_sales3.json in a stream directory in DBFS/Filestore. Place the dim_country csv file in a dim_country directory in DBFS/Filestore. The mongoDB database should aready be populated with the dim_store, but if it is not place the dim_store.sjon file in a dim_store directory in DBFS/Filestore

In [0]:
import os
import json
import pymongo
import certifi
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

## Creating New Mock Company Database 

In [0]:
%sql
DROP DATABASE IF EXISTS mock_company CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS mock_company
COMMENT "DS-2002 Final Project Database"
LOCATION "dbfs:/FileStore/project_data/mock_company"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project Database");

## Defining Variables For Database Store

In [0]:
dst_database = "mock_company"
base_dir = "dbfs:/FileStore/project_data"
database_dir = f"{base_dir}/{dst_database}"

sales_output_bronze = f"{database_dir}/fact_sales/bronze"
sales_output_silver = f"{database_dir}/fact_sales/silver"

## Ingesting Reference (Cold Path Data)

### Fetch Reference Dim Country data from MongoDb

In [0]:
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result
     

In [0]:
source_dir = '/dbfs/FileStore/project_data/dim_store'
json_files = {"dim_store" : 'dim_store.json'}

In [0]:
# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.gtykg"
atlas_database_name = "mock_company"
atlas_user_name = "cjq2cw"
atlas_password = "Password123"

In [0]:
set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7fc0d8d1f800>

In [0]:
%scala
import com.mongodb.spark._

val userName = "cjq2cw"
val pwd = "Password123"
val clusterName = "cluster0.gtykg"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_dim_store = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "mock_company")
.option("collection", "dim_store").load()

display(df_dim_store)

_id,country_id,store_id,store_location
List(6757896ccd9465dad6a9a6cf),236,1,Denver
List(6757896ccd9465dad6a9a6d0),236,2,Seattle
List(6757896ccd9465dad6a9a6d1),235,3,London
List(6757896ccd9465dad6a9a6d2),41,4,London
List(6757896ccd9465dad6a9a6d3),236,5,Austin
List(6757896ccd9465dad6a9a6d4),236,6,New York
List(6757896ccd9465dad6a9a6d5),236,7,San Diego


#### Writing Delta Table dim_store

In [0]:
%scala
df_dim_store.write.format("delta").mode("overwrite").saveAsTable("mock_company.dim_store")


In [0]:

%sql
DESCRIBE EXTENDED mock_company.dim_store

col_name,data_type,comment
_id,struct,
country_id,int,
store_id,int,
store_location,string,
,,
# Delta Statistics Columns,,
Column Names,"_id.oid, country_id, store_id, store_location",
Column Selection Method,first-32,
,,
# Detailed Table Information,,


### Fetch Reference Dim Country Data from csv file in DBFS
The data has been cleaned to include only country names that are applicable to the mock_company data

In [0]:
dim_country_csv = f"{base_dir}/dim_country/dim_country.csv"

df_dim_country = spark.read.format('csv').options(header='true', inferSchema='true').load(dim_country_csv)
display(df_dim_country)

country_id,name,country_code,iso_3166-2,region,sub_region,region_code,sub_region_code
41,Canada,CAN,ISO 3166-2:CA,Americas,Northern America,19.0,21.0
235,United Kingdom,GBR,ISO 3166-2:GB,Europe,Northern Europe,150.0,154.0
236,United States,USA,ISO 3166-2:US,Americas,Northern America,19.0,21.0


In [0]:
df_dim_country.printSchema()

root
 |-- country_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- iso_3166-2: string (nullable = true)
 |-- region: string (nullable = true)
 |-- sub_region: string (nullable = true)
 |-- region_code: double (nullable = true)
 |-- sub_region_code: double (nullable = true)



#### Writing Delta Table dim_country

In [0]:
df_dim_country.write.format("delta").mode("overwrite").saveAsTable("mock_company.dim_country")


In [0]:
%sql
DESCRIBE EXTENDED mock_company.dim_country;

col_name,data_type,comment
country_id,int,
name,string,
country_code,string,
iso_3166-2,string,
region,string,
sub_region,string,
region_code,double,
sub_region_code,double,
,,
# Delta Statistics Columns,,


### Fetch Reference Dim Date Data from Azure SQL Database

In [0]:
jdbc_hostname = "cjq2cw-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "mock_company"
jdbc_user = "vishalk"
connection_properties = {
    "user": jdbc_user,
    "password": "Password123",
    "driver": "org.mariadb.jdbc.Driver"
}

In [0]:
def get_sql_dataframe(host_name, port, db_name, conn_props, table):
    jdbcUrl = f"jdbc:mysql://{host_name}:{port}/{db_name}"

    dframe = spark.read.jdbc(url=jdbcUrl, table=table, properties=conn_props)
    return dframe

In [0]:
df_dim_date = get_sql_dataframe(jdbc_hostname, jdbc_port, src_database, connection_properties, "dim_date")
display(df_dim_date)

date_id,full_date,year,quarter,month,month_name
1,2015-03-23,2015,1,3,March
2,2015-08-25,2015,3,8,August
3,2017-06-22,2017,2,6,June
4,2015-02-23,2015,1,2,February
5,2017-12-03,2017,4,12,December
6,2015-09-02,2015,3,9,September
7,2014-11-11,2014,4,11,November
8,2014-03-31,2014,1,3,March
9,2017-03-21,2017,1,3,March
10,2014-08-18,2014,3,8,August


In [0]:
df_dim_date.printSchema()

root
 |-- date_id: long (nullable = true)
 |-- full_date: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- month_name: string (nullable = true)



#### Writing Delta Table dim_date

In [0]:
df_dim_date.write.format("delta").mode("overwrite").saveAsTable("mock_company.dim_date")


In [0]:
%sql
DESCRIBE EXTENDED mock_company.dim_date;

col_name,data_type,comment
date_id,bigint,
full_date,varchar(65535),
year,int,
quarter,int,
month,int,
month_name,varchar(65535),
,,
# Delta Statistics Columns,,
Column Names,"year, quarter, month_name, full_date, date_id, month",
Column Selection Method,first-32,


## Using Autoloader to Process Streaming (Hot Path) Fact Table Data

### Streaming Data Into Bronze Table

In [0]:
stream_dir = f"{base_dir}/stream"

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", sales_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("sales_raw_tempview"))

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW sales_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM sales_raw_tempview
)

In [0]:
%sql
SELECT * FROM sales_bronze_tempview

coupon_used,date_id,fact_sale_id,purchase_method,store_id,_rescued_data,receipt_time,source_file
0,1527,3865,In store,1,,2024-12-10T00:39:30.821Z,dbfs:/FileStore/project_data/stream/fact_sale_3.json
0,251,3866,Online,5,,2024-12-10T00:39:30.821Z,dbfs:/FileStore/project_data/stream/fact_sale_3.json
0,567,3867,Online,5,,2024-12-10T00:39:30.821Z,dbfs:/FileStore/project_data/stream/fact_sale_3.json
0,845,3868,In store,1,,2024-12-10T00:39:30.821Z,dbfs:/FileStore/project_data/stream/fact_sale_3.json
1,1205,3869,In store,3,,2024-12-10T00:39:30.821Z,dbfs:/FileStore/project_data/stream/fact_sale_3.json
1,1205,3870,In store,4,,2024-12-10T00:39:30.821Z,dbfs:/FileStore/project_data/stream/fact_sale_3.json
0,205,3871,Online,2,,2024-12-10T00:39:30.821Z,dbfs:/FileStore/project_data/stream/fact_sale_3.json
0,1528,3872,In store,5,,2024-12-10T00:39:30.821Z,dbfs:/FileStore/project_data/stream/fact_sale_3.json
0,71,3873,In store,5,,2024-12-10T00:39:30.821Z,dbfs:/FileStore/project_data/stream/fact_sale_3.json
0,1180,3874,Online,1,,2024-12-10T00:39:30.821Z,dbfs:/FileStore/project_data/stream/fact_sale_3.json


In [0]:
%sql
DESCRIBE EXTENDED sales_bronze_tempview

col_name,data_type,comment
coupon_used,bigint,
date_id,bigint,
fact_sale_id,bigint,
purchase_method,string,
store_id,bigint,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [0]:
(spark.table("sales_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{sales_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_sales_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7fc0ec465ed0>

### Bronze Tables - Performing Aggregations and Getting Data from Dim and Fact Tables

In [0]:
%sql
-- total number of sales & sales with coupon by country
SELECT 
    c.name AS country,
    COUNT(f.fact_sale_id) AS total_sales,
    SUM(CASE WHEN 
            f.coupon_used = 1 THEN 1 
                ELSE 0 
        END) 
    AS sales_with_coupon
FROM mock_company.dim_country c
    JOIN mock_company.dim_store s ON c.country_id = s.country_id
    JOIN fact_sales_bronze f ON s.store_id = f.store_id
GROUP BY c.name, c.region, c.sub_region
ORDER BY total_sales DESC;

country,total_sales,sales_with_coupon
United States,16824,1604
Canada,3176,304
United Kingdom,3176,304


In [0]:
%sql
-- total number of sales and sales with coupon by region in 2016

SELECT 
    c.region,
    d.year,
    COUNT(f.fact_sale_id) AS total_sales,
    SUM(CASE WHEN 
            f.coupon_used = 1 THEN 1 
                ELSE 0 
            END) 
        AS sales_with_coupon
FROM mock_company.dim_country c
    JOIN mock_company.dim_store s ON c.country_id = s.country_id
    JOIN fact_sales_bronze f ON s.store_id = f.store_id
    JOIN mock_company.dim_date d ON f.date_id = d.date_id
WHERE d.year = 2016
GROUP BY c.region, d.year
ORDER BY d.year, total_sales DESC;

region,year,total_sales,sales_with_coupon
Americas,2016,4040,380
Europe,2016,632,64


### Merging Data Into Silver Table

In [0]:
(spark.readStream
  .table("fact_sales_bronze")
  .createOrReplaceTempView("sales_silver_tempview"))

In [0]:
%sql
SELECT * FROM sales_silver_tempview

coupon_used,date_id,fact_sale_id,purchase_method,store_id,_rescued_data,receipt_time,source_file
1,1,1,Online,1,,2024-12-09T23:19:15.174Z,dbfs:/FileStore/project_data/stream/fact_sale_1.json
0,2,2,Phone,2,,2024-12-09T23:19:15.174Z,dbfs:/FileStore/project_data/stream/fact_sale_1.json
0,3,3,In store,1,,2024-12-09T23:19:15.174Z,dbfs:/FileStore/project_data/stream/fact_sale_1.json
0,4,4,In store,2,,2024-12-09T23:19:15.174Z,dbfs:/FileStore/project_data/stream/fact_sale_1.json
0,5,5,In store,3,,2024-12-09T23:19:15.174Z,dbfs:/FileStore/project_data/stream/fact_sale_1.json
0,5,6,In store,4,,2024-12-09T23:19:15.174Z,dbfs:/FileStore/project_data/stream/fact_sale_1.json
0,6,7,In store,3,,2024-12-09T23:19:15.174Z,dbfs:/FileStore/project_data/stream/fact_sale_1.json
0,6,8,In store,4,,2024-12-09T23:19:15.174Z,dbfs:/FileStore/project_data/stream/fact_sale_1.json
0,7,9,In store,3,,2024-12-09T23:19:15.174Z,dbfs:/FileStore/project_data/stream/fact_sale_1.json
0,7,10,In store,4,,2024-12-09T23:19:15.174Z,dbfs:/FileStore/project_data/stream/fact_sale_1.json


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_sales_silver_tempview AS (
  SELECT fact_sale_id,
  coupon_used,
  d.full_date,
  d.year,
  d.quarter,
  d.month,
  d.month_name,
  purchase_method,
  s.store_location,
  c.name,
  c.country_code,
  c.region,
  c.region_code,
  c.sub_region,
  c.sub_region_code,
  receipt_time,
  source_file
  FROM sales_silver_tempview
  INNER JOIN mock_company.dim_date d ON sales_silver_tempview.date_id = d.date_id
  INNER JOIN mock_company.dim_store s ON sales_silver_tempview.store_id = s.store_id
  INNER JOIN mock_company.dim_country c ON s.country_id = c.country_id
);

In [0]:
(spark.table("fact_sales_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{sales_output_silver}/_checkpoint")
      .option("mergeSchema", "true")
      .outputMode("append")
      .table("fact_sales_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7fc0d85d2710>

In [0]:
%sql
SELECT * FROM fact_sales_silver

fact_sale_id,coupon_used,date_id,purchase_method,store_id,receipt_time,source_file,full_date,year,quarter,month,month_name,store_location,name,country_code,region,region_code,sub_region,sub_region_code
1933,0,,In store,,2024-12-10T00:30:53.946Z,dbfs:/FileStore/project_data/stream/fact_sale_2.json,2017-09-16,2017.0,3.0,9.0,September,New York,United States,USA,Americas,19.0,Northern America,21.0
1934,1,,Online,,2024-12-10T00:30:53.946Z,dbfs:/FileStore/project_data/stream/fact_sale_2.json,2014-07-02,2014.0,3.0,7.0,July,Seattle,United States,USA,Americas,19.0,Northern America,21.0
1935,0,,In store,,2024-12-10T00:30:53.946Z,dbfs:/FileStore/project_data/stream/fact_sale_2.json,2017-07-12,2017.0,3.0,7.0,July,Denver,United States,USA,Americas,19.0,Northern America,21.0
1936,0,,In store,,2024-12-10T00:30:53.946Z,dbfs:/FileStore/project_data/stream/fact_sale_2.json,2017-01-02,2017.0,1.0,1.0,January,Denver,United States,USA,Americas,19.0,Northern America,21.0
1937,1,,In store,,2024-12-10T00:30:53.946Z,dbfs:/FileStore/project_data/stream/fact_sale_2.json,2017-09-14,2017.0,3.0,9.0,September,Austin,United States,USA,Americas,19.0,Northern America,21.0
1938,0,,In store,,2024-12-10T00:30:53.946Z,dbfs:/FileStore/project_data/stream/fact_sale_2.json,2017-03-04,2017.0,1.0,3.0,March,New York,United States,USA,Americas,19.0,Northern America,21.0
1939,0,,Online,,2024-12-10T00:30:53.946Z,dbfs:/FileStore/project_data/stream/fact_sale_2.json,2015-09-16,2015.0,3.0,9.0,September,Denver,United States,USA,Americas,19.0,Northern America,21.0
1940,0,,In store,,2024-12-10T00:30:53.946Z,dbfs:/FileStore/project_data/stream/fact_sale_2.json,2017-02-24,2017.0,1.0,2.0,February,Seattle,United States,USA,Americas,19.0,Northern America,21.0
1941,0,,In store,,2024-12-10T00:30:53.946Z,dbfs:/FileStore/project_data/stream/fact_sale_2.json,2013-11-21,2013.0,4.0,11.0,November,Seattle,United States,USA,Americas,19.0,Northern America,21.0
1942,0,,In store,,2024-12-10T00:30:53.946Z,dbfs:/FileStore/project_data/stream/fact_sale_2.json,2016-08-05,2016.0,3.0,8.0,August,Austin,United States,USA,Americas,19.0,Northern America,21.0


### Silver Tables - Performing Aggregations and Getting Data from the Fact Table

In [0]:
%sql
-- total number of sales & sales with coupon by country
SELECT 
    f.name AS country,
    COUNT(f.fact_sale_id) AS total_sales,
    SUM(CASE WHEN 
            f.coupon_used = 1 THEN 1 
                ELSE 0 
        END) 
    AS sales_with_coupon
FROM fact_sales_silver f
GROUP BY f.name
ORDER BY total_sales DESC;

country,total_sales,sales_with_coupon
United States,16824,1604
,5794,553
Canada,3176,304
United Kingdom,3176,304


The tables might return a `null` country if the stream is initializing and the code is run too quickly but this doesnt affect the other rows which output the correct result

In [0]:
%sql
-- total number of sales and sales with coupon by region in 2016

SELECT 
    f.region,
    f.year,
    COUNT(f.fact_sale_id) AS total_sales,
    SUM(CASE WHEN 
            f.coupon_used = 1 THEN 1 
                ELSE 0 
            END) 
        AS sales_with_coupon
FROM fact_sales_silver f
WHERE f.year = 2016
GROUP BY f.region, f.year
ORDER BY f.year, total_sales DESC;

region,year,total_sales,sales_with_coupon
Americas,2016,4040,380
Europe,2016,632,64
