## Captsone Project: Lakehouse with Structured Streaming
This project is to fulfill the requirements of the final end-of-session capstone project for course **DS-2002: Data Systems**. 

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
%python
%pip install pymongo

import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "upn2vz-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "mystore"

connection_properties = {
  "user" : "upn2vzatvirginia",
  "password" : "Akinolami6650!",
  "driver" : "org.mariadb.jdbc.Driver"
}


# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.mkvm3"
atlas_database_name = "northwind_dw2"
atlas_user_name = "upn2vz"
atlas_password = "3yseiHK29Bemx2qK"


user_name = "upn2vz"
pwd = "3yseiHK29Bemx2qK"
cluster_name = "cluster0.mkvm3"
atlas_uri = f"mongodb+srv://{user_name}:{pwd}@{cluster_name}.mongodb.net/?retryWrites=true&w=majority"

# Data Files (JSON) Information ###############################
dst_database = "mystore"

base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"
purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
inventory_trans_stream_dir = f"{stream_dir}/inventory_transactions"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"

inventory_trans_output_bronze = f"{database_dir}/fact_inventory_transactions/bronze"
inventory_trans_output_silver = f"{database_dir}/fact_inventory_transactions/silver"
inventory_trans_output_gold   = f"{database_dir}/fact_inventory_transactions/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS mystore CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS mystore
COMMENT "DS-2002 Lab 06 Database"
LOCATION "dbfs:/FileStore/lab_data/mystore"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Lab 6.0");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://wna8fw-mysql.mysql.database.azure.com:3306/northwind_dw2",
  dbtable "dim_date",
  user "jtupitza",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE mystore;

CREATE OR REPLACE TABLE mystore.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/mystore/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED mystore.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM mystore.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 1.3. Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_product" that extracts data from your MySQL Northwind database.
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://upn2vz-mysql.mysql.database.azure.com:3306/mystore",
  dbtable "dim_products", 
  user "upn2vzatvirginia",
  password "Akinolami6650!"
)




In [0]:
%sql
USE DATABASE mystore;

-- Create a new table named "northwind_dlh.dim_product" using data from the view named "view_product"

CREATE OR REPLACE TABLE northwind_dlh.dim_products 
COMMENT "Product Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/northwind_dlh/dim_product"
AS SELECT * FROM view_product


num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED mystore.dim_products;

col_name,data_type,comment
product_key,bigint,
product_id,bigint,
product_code,varchar(65535),
product_name,varchar(65535),
standard_cost,double,
list_price,double,
reorder_level,bigint,
target_level,bigint,
quantity_per_unit,varchar(65535),
discontinued,bigint,


In [0]:
%sql
SELECT * FROM mystore.dim_products LIMIT 5

product_key,product_id,product_code,product_name,standard_cost,list_price,reorder_level,target_level,quantity_per_unit,discontinued,minimum_reorder_quantity,category
1,1,NWTB-1,Northwind Traders Chai,13.5,18.0,10,40,10 boxes x 20 bags,0,10.0,Beverages
2,3,NWTCO-3,Northwind Traders Syrup,7.5,10.0,25,100,12 - 550 ml bottles,0,25.0,Condiments
3,4,NWTCO-4,Northwind Traders Cajun Seasoning,16.5,22.0,10,40,48 - 6 oz jars,0,10.0,Condiments
4,5,NWTO-5,Northwind Traders Olive Oil,16.0125,21.35,10,40,36 boxes,0,10.0,Oil
5,6,NWTJP-6,Northwind Traders Boysenberry Spread,18.75,25.0,25,100,12 - 8 oz jars,0,25.0,"Jams, Preserves"


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/lab_data/retail/batch'

path,name,size,modificationTime
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimCustomers.json,Northwind_DimCustomers.json,10476,1732744914000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimEmployees.csv,Northwind_DimEmployees.csv,2164,1732744914000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimInvoices.json,Northwind_DimInvoices.json,6263,1732744914000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimShippers.csv,Northwind_DimShippers.csv,262,1732744914000
dbfs:/FileStore/lab_data/retail/batch/Northwind_DimSuppliers.json,Northwind_DimSuppliers.json,1480,1732744914000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/lab_data/retail/batch'
json_files = {"customers" : 'Northwind_DimCustomers.json'
              , "suppliers" : 'Northwind_DimSuppliers.json'
              , "invoices" : 'Northwind_DimInvoices.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

InsertManyResult([ObjectId('6747abe575d8d3ac57a2f9a4'), ObjectId('6747abe575d8d3ac57a2f9a5'), ObjectId('6747abe575d8d3ac57a2f9a6'), ObjectId('6747abe575d8d3ac57a2f9a7'), ObjectId('6747abe575d8d3ac57a2f9a8'), ObjectId('6747abe575d8d3ac57a2f9a9'), ObjectId('6747abe575d8d3ac57a2f9aa'), ObjectId('6747abe575d8d3ac57a2f9ab'), ObjectId('6747abe575d8d3ac57a2f9ac'), ObjectId('6747abe575d8d3ac57a2f9ad'), ObjectId('6747abe575d8d3ac57a2f9ae'), ObjectId('6747abe575d8d3ac57a2f9af'), ObjectId('6747abe575d8d3ac57a2f9b0'), ObjectId('6747abe575d8d3ac57a2f9b1'), ObjectId('6747abe575d8d3ac57a2f9b2'), ObjectId('6747abe575d8d3ac57a2f9b3'), ObjectId('6747abe575d8d3ac57a2f9b4'), ObjectId('6747abe575d8d3ac57a2f9b5'), ObjectId('6747abe575d8d3ac57a2f9b6'), ObjectId('6747abe575d8d3ac57a2f9b7'), ObjectId('6747abe575d8d3ac57a2f9b8'), ObjectId('6747abe575d8d3ac57a2f9b9'), ObjectId('6747abe575d8d3ac57a2f9ba'), ObjectId('6747abe575d8d3ac57a2f9bb'), ObjectId('6747abe575d8d3ac57a2f9bc'), ObjectId('6747abe575d8d3ac57a2f9

##### 2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
from pyspark.sql import SparkSession

# Connection parameters
user_name = "upn2vz"
pwd = "3yseiHK29Bemx2qK"
cluster_name = "cluster0.mkvm3"
atlas_uri = f"mongodb+srv://{user_name}:{pwd}@{cluster_name}.mongodb.net/?retryWrites=true&w=majority"

# Initialize Spark session
spark = SparkSession.builder \
    .appName("MongoDB Connection") \
    .config("spark.mongodb.input.uri", atlas_uri) \
    .getOrCreate()

# Read data from MongoDB
df_customer = spark.read \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb+srv://upn2vz:3yseiHK29Bemx2qK@cluster0.mkvm3.mongodb.net/myshop?retryWrites=true&w=majority") \
    .option("collection", "customers") \
    .load() \
    .select("customer_key", "company", "last_name", "first_name", "job_title",
            "business_phone", "fax_number", "address", "city", "state_province",
            "zip_postal_code", "country_region")

# Display first 5 rows
df_customer.limit(5).show()

# Display all records
df_customer.show()

+------------+---------+----------------+----------+--------------------+--------------+-------------+--------------+-----------+--------------+---------------+--------------+
|customer_key|  company|       last_name|first_name|           job_title|business_phone|   fax_number|       address|       city|state_province|zip_postal_code|country_region|
+------------+---------+----------------+----------+--------------------+--------------+-------------+--------------+-----------+--------------+---------------+--------------+
|           1|Company A|          Bedecs|      Anna|               Owner| (123)555-0100|(123)555-0101|123 1st Street|    Seattle|            WA|          99999|           USA|
|           2|Company B|Gratacos Solsona|   Antonio|               Owner| (123)555-0100|(123)555-0101|123 2nd Street|     Boston|            MA|          99999|           USA|
|           3|Company C|            Axen|    Thomas|Purchasing Repres...| (123)555-0100|(123)555-0101|123 3rd Street|Los

##### 2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
from pyspark.sql import SparkSession

# Connection parameters
user_name = "upn2vz"
pwd = "3yseiHK29Bemx2qK"
cluster_name = "cluster0.mkvm3"
atlas_uri = f"mongodb+srv://{user_name}:{pwd}@{cluster_name}.mongodb.net/?retryWrites=true&w=majority"

# Initialize Spark session
spark = SparkSession.builder \
    .appName("MongoDB Connection") \
    .config("spark.mongodb.input.uri", atlas_uri) \
    .getOrCreate()

# Read data from MongoDB
df_customer = spark.read \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb+srv://jtupitza:Passw0rd1234@sandbox.zibbf.mongodb.net/northwind_dw2?retryWrites=true&w=majority") \
    .option("collection", "customers") \
    .load() \
    .select("customer_key", "company", "last_name", "first_name", "job_title",
            "business_phone", "fax_number", "address", "city", "state_province",
            "zip_postal_code", "country_region")

# Display first 5 rows
df_customer.limit(5).show()

# Display all records
df_customer.show()

df_customer.write.format("delta").mode("overwrite").saveAsTable("myshop.dim_customer")

+------------+---------+----------------+----------+--------------------+--------------+-------------+--------------+-----------+--------------+---------------+--------------+
|customer_key|  company|       last_name|first_name|           job_title|business_phone|   fax_number|       address|       city|state_province|zip_postal_code|country_region|
+------------+---------+----------------+----------+--------------------+--------------+-------------+--------------+-----------+--------------+---------------+--------------+
|           1|Company A|          Bedecs|      Anna|               Owner| (123)555-0100|(123)555-0101|123 1st Street|    Seattle|            WA|          99999|           USA|
|           2|Company B|Gratacos Solsona|   Antonio|               Owner| (123)555-0100|(123)555-0101|123 2nd Street|     Boston|            MA|          99999|           USA|
|           3|Company C|            Axen|    Thomas|Purchasing Repres...| (123)555-0100|(123)555-0101|123 3rd Street|Los

In [0]:
%sql
DESCRIBE EXTENDED myshop.dim_customer

col_name,data_type,comment
customer_key,int,
company,string,
last_name,string,
first_name,string,
job_title,string,
business_phone,string,
fax_number,string,
address,string,
city,string,
state_province,string,


In [0]:
%sql
SELECT * FROM myshop.dim_customer LIMIT 5

customer_key,company,last_name,first_name,job_title,business_phone,fax_number,address,city,state_province,zip_postal_code,country_region
1,Company A,Bedecs,Anna,Owner,(123)555-0100,(123)555-0101,123 1st Street,Seattle,WA,99999,USA
2,Company B,Gratacos Solsona,Antonio,Owner,(123)555-0100,(123)555-0101,123 2nd Street,Boston,MA,99999,USA
3,Company C,Axen,Thomas,Purchasing Representative,(123)555-0100,(123)555-0101,123 3rd Street,Los Angelas,CA,99999,USA
4,Company D,Lee,Christina,Purchasing Manager,(123)555-0100,(123)555-0101,123 4th Street,New York,NY,99999,USA
5,Company E,O’Donnell,Martin,Owner,(123)555-0100,(123)555-0101,123 5th Street,Minneapolis,MN,99999,USA


##### 2.4.1 Fetch Supplier Dimension Data from the New MongoDB Collection

In [0]:
from pyspark.sql import SparkSession

# Connection parameters
user_name = "upn2vz"
pwd = "3yseiHK29Bemx2qK"
cluster_name = "cluster0.mkvm3"
atlas_uri = f"mongodb+srv://{user_name}:{pwd}@{cluster_name}.mongodb.net/?retryWrites=true&w=majority"


# Initialize Spark session
spark = SparkSession.builder \
    .appName("MongoDB Connection") \
    .config("spark.mongodb.input.uri", atlas_uri) \
    .getOrCreate()

# Read data from MongoDB
df_supplier = spark.read \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb+srv://upn2vz:3yseiHK29Bemx2qK@cluster0.mkvm3.mongodb.net/myshop?retryWrites=true&w=majority") \
    .option("collection", "suppliers") \
    .load() \
    .select("last_name", "first_name", "company")

# Display first 5 rows
df_supplier.limit(5).show()

# Display all records
df_supplier.show()

df_supplier.write.format("delta").mode("overwrite").saveAsTable("myshop.dim_supplier")

+--------------------+------------+----------+
|           last_name|  first_name|   company|
+--------------------+------------+----------+
|            Andersen|Elizabeth A.|Supplier A|
|              Weiler|    Cornelia|Supplier B|
|              Kelley|   Madeleine|Supplier C|
|                Sato|       Naoki|Supplier D|
|Hernandez-Echevarria|       Amaya|Supplier E|
+--------------------+------------+----------+

+--------------------+------------+----------+
|           last_name|  first_name|   company|
+--------------------+------------+----------+
|            Andersen|Elizabeth A.|Supplier A|
|              Weiler|    Cornelia|Supplier B|
|              Kelley|   Madeleine|Supplier C|
|                Sato|       Naoki|Supplier D|
|Hernandez-Echevarria|       Amaya|Supplier E|
|            Hayakawa|      Satomi|Supplier F|
|             Glasson|      Stuart|Supplier G|
|              Dunton|   Bryn Paul|Supplier H|
|            Sandberg|      Mikael|Supplier I|
|           

##### 2.4.2. Use the Spark DataFrame to Create a New Suppliers Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, sha2, current_timestamp


spark = SparkSession.builder \
    .appName("Supplier Dimension Processing") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
supplier_json = f"{batch_dir}/Northwind_DimSuppliers.json"

# Read supplier data from JSON
df_supplier = spark.read \
    .format('json') \
    .option("multiline", "true") \
    .load(supplier_json)

df_supplier_enhanced = df_supplier \
    .withColumn("supplier_name", concat_ws(" ", col("first_name"), col("last_name"))) \
    .withColumn("supplier_key", sha2(concat_ws("||", col("company"), col("supplier_name")), 256)) \
    .withColumn("created_timestamp", current_timestamp()) \
    .withColumn("modified_timestamp", current_timestamp())

df_supplier_dim = df_supplier_enhanced.select(
    "supplier_key",
    "company",
    "supplier_name",
    "first_name",
    "last_name",
    "created_timestamp",
    "modified_timestamp"
)

# Write to Delta table
df_supplier_dim.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("mystore.dim_supplier")

# Verify the table was created
print("Row count in dim_supplier:", spark.table("mystore.dim_supplier").count())
# Display sample records
spark.table("mystore.dim_supplier").show(5)

Row count in dim_supplier: 10
+--------------------+----------+--------------------+------------+--------------------+--------------------+--------------------+
|        supplier_key|   company|       supplier_name|  first_name|           last_name|   created_timestamp|  modified_timestamp|
+--------------------+----------+--------------------+------------+--------------------+--------------------+--------------------+
|65a35e23007d11d8f...|Supplier A|Elizabeth A. Ande...|Elizabeth A.|            Andersen|2024-11-28 04:18:...|2024-11-28 04:18:...|
|07268d6f6d40db2dc...|Supplier B|     Cornelia Weiler|    Cornelia|              Weiler|2024-11-28 04:18:...|2024-11-28 04:18:...|
|0a62dc2cef2727b6e...|Supplier C|    Madeleine Kelley|   Madeleine|              Kelley|2024-11-28 04:18:...|2024-11-28 04:18:...|
|9ae8a31c935f9779b...|Supplier D|          Naoki Sato|       Naoki|                Sato|2024-11-28 04:18:...|2024-11-28 04:18:...|
|0e4bc2b4da1a27901...|Supplier E|Amaya Hernandez-E...

In [0]:
%sql
DESCRIBE EXTENDED mystore.dim_supplier

col_name,data_type,comment
supplier_key,string,
company,string,
supplier_name,string,
first_name,string,
last_name,string,
created_timestamp,timestamp,
modified_timestamp,timestamp,
,,
# Delta Statistics Columns,,
Column Names,"first_name, modified_timestamp, company, last_name, supplier_name, created_timestamp, supplier_key",


In [0]:
%sql
SELECT * FROM mystore.dim_supplier LIMIT 5

supplier_key,company,last_name,first_name,job_title
1,Supplier A,Andersen,Elizabeth A.,Sales Manager
2,Supplier B,Weiler,Cornelia,Sales Manager
3,Supplier C,Kelley,Madeleine,Sales Representative
4,Supplier D,Sato,Naoki,Marketing Manager
5,Supplier E,Hernandez-Echevarria,Amaya,Sales Manager


##### 2.5.1 Fetch Invoice Dimension Data from teh New MongoDB Collection

In [0]:
from pyspark.sql import SparkSession

# Connection parameters
user_name = "upn2vz"
pwd = "3yseiHK29Bemx2qK"
cluster_name = "cluster0.mkvm3"
atlas_uri = f"mongodb+srv://{user_name}:{pwd}@{cluster_name}.mongodb.net/?retryWrites=true&w=majority"

# Initialize Spark session
spark = SparkSession.builder \
    .appName("MongoDB Connection") \
    .config("spark.mongodb.input.uri", atlas_uri) \
    .getOrCreate()

# Read data from MongoDB
df_invoice = spark.read \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb+srv://upn2vz:3yseiHK29Bemx2qK@cluster0.mkvm3.mongodb.net/myshop?retryWrites=true&w=majority") \
    .option("collection", "invoices") \
    .load() \
    .select("invoice_key",
        "invoice_date", 
        "due_date",
        "tax",
        "shipping",
        "amount_due")

# Display first 5 rows
df_invoice.limit(5).show()

# Display all records
df_invoice.show()

+-----------+-------------------+-------------------+---+--------+----------+
|invoice_key|       invoice_date|           due_date|tax|shipping|amount_due|
+-----------+-------------------+-------------------+---+--------+----------+
|          5|2006-03-22 16:08:59|2006-04-04 11:43:08|  0|       0|         0|
|          6|2006-03-22 16:10:27|2006-04-04 11:43:08|  0|       0|         0|
|          7|2006-03-24 10:41:41|2006-04-04 11:43:08|  0|       0|         0|
|          8|2006-03-24 10:55:46|2006-04-04 11:43:08|  0|       0|         0|
|          9|2006-03-24 10:56:57|2006-04-04 11:43:08|  0|       0|         0|
+-----------+-------------------+-------------------+---+--------+----------+

+-----------+-------------------+-------------------+---+--------+----------+
|invoice_key|       invoice_date|           due_date|tax|shipping|amount_due|
+-----------+-------------------+-------------------+---+--------+----------+
|          5|2006-03-22 16:08:59|2006-04-04 11:43:08|  0|      

##### 2.5.2. Use the Spark DataFrame to Create a New Invoices Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
from pyspark.sql import SparkSession



user_name = "upn2vz"
pwd = "3yseiHK29Bemx2qK"
cluster_name = "cluster0.mkvm3"
atlas_uri = f"mongodb+srv://{user_name}:{pwd}@{cluster_name}.mongodb.net/?retryWrites=true&w=majority"


spark = SparkSession.builder \
    .appName("MongoDB Invoice Processing") \
    .config("spark.mongodb.input.uri", atlas_uri) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


df_invoice = spark.read \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb+srv://upn2vz:3yseiHK29Bemx2qK@cluster0.mkvm3.mongodb.net/myshop?retryWrites=true&w=majority") \
    .option("database", "myshop") \
    .option("collection", "invoices") \
    .load() \
    .select("invoice_key",
            "invoice_date",
            "due_date",
            "tax",
            "shipping",
            "amount_due")

# Write to Delta table
df_invoice.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("myshop.dim_invoice")


print("Number of records written:", df_invoice.count())
spark.table("myshop.dim_invoice").show(5)

Number of records written: 35
+-----------+---------+-------------------+-------------------+---+--------+----------+
|invoice_key|order_key|       invoice_date|           due_date|tax|shipping|amount_due|
+-----------+---------+-------------------+-------------------+---+--------+----------+
|          5|     NULL|2006-03-22 16:08:59|2006-04-04 11:43:08|  0|       0|         0|
|          6|     NULL|2006-03-22 16:10:27|2006-04-04 11:43:08|  0|       0|         0|
|          7|     NULL|2006-03-24 10:41:41|2006-04-04 11:43:08|  0|       0|         0|
|          8|     NULL|2006-03-24 10:55:46|2006-04-04 11:43:08|  0|       0|         0|
|          9|     NULL|2006-03-24 10:56:57|2006-04-04 11:43:08|  0|       0|         0|
+-----------+---------+-------------------+-------------------+---+--------+----------+
only showing top 5 rows



In [0]:
%sql
DESCRIBE EXTENDED mystore.dim_invoice

col_name,data_type,comment
invoice_key,int,
order_key,int,
invoice_date,string,
due_date,string,
tax,int,
shipping,int,
amount_due,int,
,,
# Delta Statistics Columns,,
Column Names,"amount_due, order_key, invoice_key, tax, due_date, invoice_date, shipping",


In [0]:
%sql
SELECT * FROM mystore.dim_invoice LIMIT 5

invoice_key,order_key,invoice_date,due_date,tax,shipping,amount_due
5,31,2006-03-22 16:08:59,2006-04-04 11:43:08,0,0,0
6,32,2006-03-22 16:10:27,2006-04-04 11:43:08,0,0,0
7,40,2006-03-24 10:41:41,2006-04-04 11:43:08,0,0,0
8,39,2006-03-24 10:55:46,2006-04-04 11:43:08,0,0,0
9,38,2006-03-24 10:56:57,2006-04-04 11:43:08,0,0,0


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, concat_ws, col

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Employee Dimension Processing") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
employee_csv = f"{batch_dir}/Northwind_DimEmployees.csv"

# Read employee data from CSV
df_employee = spark.read \
    .format('csv') \
    .options(header='true', inferSchema='true') \
    .load(employee_csv)

# Add audit columns
df_employee_enhanced = df_employee \
    .withColumn("created_timestamp", current_timestamp()) \
    .withColumn("modified_timestamp", current_timestamp())

# Write to Delta table
df_employee_enhanced.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("mystore.dim_employee")

# Verify the table was created
print("Row count in dim_employee:", spark.table("mystore.dim_employee").count())
# Display sample records
spark.table("mystore.dim_employee").show(5)


print("\nTable Schema:")
spark.table("mystore.dim_employee").printSchema()

Row count in dim_employee: 9
+------------+-----------------+---------+----------+--------------------+--------------------+--------------+-------------+-------------+--------------+--------+--------------+---------------+--------------+--------------------+--------------------+--------------------+
|employee_key|          company|last_name|first_name|       email_address|           job_title|business_phone|   home_phone|   fax_number|       address|    city|state_province|zip_postal_code|country_region|            web_page|   created_timestamp|  modified_timestamp|
+------------+-----------------+---------+----------+--------------------+--------------------+--------------+-------------+-------------+--------------+--------+--------------+---------------+--------------+--------------------+--------------------+--------------------+
|           1|Northwind Traders|Freehafer|     Nancy|nancy@northwindtr...|Sales Representative| (123)555-0100|(123)555-0102|(123)555-0103|123 1st Avenue| S

In [0]:
df_employee.show()

+------------+-----------------+--------------+----------+--------------------+--------------------+--------------+-------------+-------------+--------------+--------+--------------+---------------+--------------+--------------------+
|employee_key|          company|     last_name|first_name|       email_address|           job_title|business_phone|   home_phone|   fax_number|       address|    city|state_province|zip_postal_code|country_region|            web_page|
+------------+-----------------+--------------+----------+--------------------+--------------------+--------------+-------------+-------------+--------------+--------+--------------+---------------+--------------+--------------------+
|           1|Northwind Traders|     Freehafer|     Nancy|nancy@northwindtr...|Sales Representative| (123)555-0100|(123)555-0102|(123)555-0103|123 1st Avenue| Seattle|            WA|          99999|           USA|#http://northwind...|
|           2|Northwind Traders|       Cencini|    Andrew|an

In [0]:
df_employee.write.format("delta").mode("overwrite").saveAsTable("mystore.dim_employee")

In [0]:
%sql
DESCRIBE EXTENDED mystore.dim_employee;

col_name,data_type,comment
employee_key,int,
company,string,
last_name,string,
first_name,string,
email_address,string,
job_title,string,
business_phone,string,
home_phone,string,
fax_number,string,
address,string,


In [0]:
%sql
SELECT * FROM mystore.dim_employee LIMIT 5;

employee_key,company,last_name,first_name,email_address,job_title,business_phone,home_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page
1,Northwind Traders,Freehafer,Nancy,nancy@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 1st Avenue,Seattle,WA,99999,USA,#http://northwindtraders.com#
2,Northwind Traders,Cencini,Andrew,andrew@northwindtraders.com,"Vice President, Sales",(123)555-0100,(123)555-0102,(123)555-0103,123 2nd Avenue,Bellevue,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
3,Northwind Traders,Kotas,Jan,jan@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 3rd Avenue,Redmond,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
4,Northwind Traders,Sergienko,Mariya,mariya@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 4th Avenue,Kirkland,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#
5,Northwind Traders,Thorpe,Steven,steven@northwindtraders.com,Sales Manager,(123)555-0100,(123)555-0102,(123)555-0103,123 5th Avenue,Seattle,WA,99999,USA,http://northwindtraders.com#http://northwindtraders.com/#


##### 3.2 Use PySpark to Read Shipper Dimension Data from CSV File

In [0]:
shipper_csv = f"{batch_dir}/Northwind_DimShippers.csv"

df_shipper = spark.read.format('csv') \
    .options(header='true', inferSchema='true') \
    .load(shipper_csv)

In [0]:
df_shipper.printSchema()

root
 |-- shipper_key: integer (nullable = true)
 |-- company: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_province: string (nullable = true)
 |-- zip_postal_code: integer (nullable = true)
 |-- country_region: string (nullable = true)



In [0]:
df_shipper.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("mystore.dim_shipper")

In [0]:
%sql
DESCRIBE EXTENDED mystore.dim_shipper;

col_name,data_type,comment
shipper_key,int,
company,string,
address,string,
city,string,
state_province,string,
zip_postal_code,int,
country_region,string,
,,
# Delta Statistics Columns,,
Column Names,"shipper_key, city, zip_postal_code, state_province, country_region, company, address",


In [0]:
%sql
SELECT * FROM mystore.dim_shipper LIMIT 5;

shipper_key,company,address,city,state_province,zip_postal_code,country_region
1,Shipping Company A,123 Any Street,Memphis,TN,99999,USA
2,Shipping Company B,123 Any Street,Memphis,TN,99999,USA
3,Shipping Company C,123 Any Street,Memphis,TN,99999,USA


##### Verify Dimension Tables

In [0]:
%sql
USE mystore;
SHOW TABLES

database,tableName,isTemporary
northwind_dlh,dim_customer,False
northwind_dlh,dim_date,False
northwind_dlh,dim_employee,False
northwind_dlh,dim_invoice,False
northwind_dlh,dim_product,False
northwind_dlh,dim_shipper,False
northwind_dlh,dim_supplier,False
northwind_dlh,fact_monthly_orders_by_customer_gold,False
northwind_dlh,fact_orders_bronze,False
northwind_dlh,fact_orders_silver,False


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
dst_database = "mystore"

base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"
orders_output_bronze = f"{database_dir}/fact_orders/bronze"
data_dir = f"{base_dir}/retail"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 #.option("cloudFiles.schemaHints", "order_key BIGINT")
 #.option("cloudFiles.schemaHints", "employee_key BIGINT")
 #.option("cloudFiles.schemaHints", "customer_key BIGINT") 
 #.option("cloudFiles.schemaHints", "product_key BIGINT")
 #.option("cloudFiles.schemaHints", "shipper_key DECIMAL")
 #.option("cloudFiles.schemaHints", "order_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "paid_date_key DECIMAL")
 #.option("cloudFiles.schemaHints", "shipped_date_key DECIMAL") 
 #.option("cloudFiles.schemaHints", "quantity DECIMAL")
 #.option("cloudFiles.schemaHints", "unit_price DECIMAL")
 #.option("cloudFiles.schemaHints", "discount DECIMAL")
 #.option("cloudFiles.schemaHints", "shipping_fee DECIMAL")
 #.option("cloudFiles.schemaHints", "taxes DECIMAL")
 #.option("cloudFiles.schemaHints", "tax_rate DECIMAL")
 #.option("cloudFiles.schemaHints", "payment_type STRING")
 #.option("cloudFiles.schemaHints", "order_status STRING")
 #.option("cloudFiles.schemaHints", "order_details_status STRING")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
-- Create the orders_raw_tempview first
CREATE OR REPLACE TEMPORARY VIEW orders_raw_tempview AS
SELECT * FROM samples.tpch.orders;

-- Add Metadata for Traceability
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS
SELECT *, current_timestamp() AS receipt_time, input_file_name() AS source_file
FROM orders_raw_tempview;

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

o_orderkey,o_custkey,o_orderstatus,o_totalprice,o_orderdate,o_orderpriority,o_clerk,o_shippriority,o_comment,receipt_time,source_file
13710944,227285,O,162169.66,1995-10-11,1-URGENT,Clerk#000000432,0,accounts. ruthlessly regular accounts alongside of the car,2024-11-28T04:34:32.264092Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710945,225010,O,252273.67,1997-09-29,5-LOW,Clerk#000002337,0,ironic platelets snooze slyly. instru,2024-11-28T04:34:32.264092Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710946,238820,O,179947.16,1997-10-31,2-HIGH,Clerk#000004135,0,ole requests. regularly,2024-11-28T04:34:32.264092Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710947,581233,O,33843.49,1995-05-25,2-HIGH,Clerk#000000138,0,arefully final platelets. carefully express packages boost careful,2024-11-28T04:34:32.264092Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710948,10033,O,42500.65,1995-09-04,4-NOT SPECIFIED,Clerk#000003398,0,regular requests use furiously. fluffily,2024-11-28T04:34:32.264092Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710949,615502,O,48225.35,1995-07-13,3-MEDIUM,Clerk#000004639,0,ate quickly along the enticing ideas. furiously i,2024-11-28T04:34:32.264092Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710950,710665,F,265761.0,1992-11-29,2-HIGH,Clerk#000000735,0,", sly ideas among the ideas promise furiously about the furiously e",2024-11-28T04:34:32.264092Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710951,382528,F,137666.86,1993-05-21,5-LOW,Clerk#000000777,0,. blithely pending packages nag furiously against the carefully unusual ac,2024-11-28T04:34:32.264092Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710976,122618,O,158725.42,1998-03-06,4-NOT SPECIFIED,Clerk#000001281,0,ages. final packages wake carefully according,2024-11-28T04:34:32.264092Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710977,575623,O,178703.66,1998-05-04,5-LOW,Clerk#000003371,0,", final requests hinder s",2024-11-28T04:34:32.264092Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet


In [0]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Orders Batch Processing") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


orders_df = spark.read \
    .table("orders_bronze_tempview")

# Write to the target Delta table
orders_df.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("fact_orders_bronze")

# Optional: Verify the data
print("Number of records written:", spark.table("fact_orders_bronze").count())

Number of records written: 7500000


##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

o_orderkey,o_custkey,o_orderstatus,o_totalprice,o_orderdate,o_orderpriority,o_clerk,o_shippriority,o_comment,receipt_time,source_file
13710944,227285,O,162169.66,1995-10-11,1-URGENT,Clerk#000000432,0,accounts. ruthlessly regular accounts alongside of the car,2024-11-28T04:37:58.293994Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710945,225010,O,252273.67,1997-09-29,5-LOW,Clerk#000002337,0,ironic platelets snooze slyly. instru,2024-11-28T04:37:58.293994Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710946,238820,O,179947.16,1997-10-31,2-HIGH,Clerk#000004135,0,ole requests. regularly,2024-11-28T04:37:58.293994Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710947,581233,O,33843.49,1995-05-25,2-HIGH,Clerk#000000138,0,arefully final platelets. carefully express packages boost careful,2024-11-28T04:37:58.293994Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710948,10033,O,42500.65,1995-09-04,4-NOT SPECIFIED,Clerk#000003398,0,regular requests use furiously. fluffily,2024-11-28T04:37:58.293994Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710949,615502,O,48225.35,1995-07-13,3-MEDIUM,Clerk#000004639,0,ate quickly along the enticing ideas. furiously i,2024-11-28T04:37:58.293994Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710950,710665,F,265761.0,1992-11-29,2-HIGH,Clerk#000000735,0,", sly ideas among the ideas promise furiously about the furiously e",2024-11-28T04:37:58.293994Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710951,382528,F,137666.86,1993-05-21,5-LOW,Clerk#000000777,0,. blithely pending packages nag furiously against the carefully unusual ac,2024-11-28T04:37:58.293994Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710976,122618,O,158725.42,1998-03-06,4-NOT SPECIFIED,Clerk#000001281,0,ages. final packages wake carefully according,2024-11-28T04:37:58.293994Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet
13710977,575623,O,178703.66,1998-05-04,5-LOW,Clerk#000003371,0,", final requests hinder s",2024-11-28T04:37:58.293994Z,abfss://metastore@ucstprdeastus2.dfs.core.windows.net/defe8e97-ff8d-4821-bc27-1a17ce6f51cc/tables/f736c12a-7b72-4227-8121-189046733e83/part-00000-6aa986e6-438a-4b41-b103-ef5f09e1d660-c000.snappy.parquet


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
o_orderkey,bigint,
o_custkey,bigint,
o_orderstatus,string,
o_totalprice,"decimal(18,2)",
o_orderdate,date,
o_orderpriority,string,
o_clerk,string,
o_shippriority,int,
o_comment,string,
receipt_time,timestamp,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT 
    o.o_orderkey as fact_order_key,
    o.o_orderkey as order_key,
    e.employee_key,
    e.last_name AS employee_last_name,
    e.first_name AS employee_first_name,
    e.job_title AS employee_job_title,
    e.company AS employee_company,
    c.customer_key,
    c.last_name AS customer_last_name,
    c.first_name AS customer_first_name,
    o.o_orderdate as order_date_key,
    o.o_orderstatus as order_status
  FROM orders_silver_tempview AS o
  INNER JOIN mystore.dim_employee AS e
    ON e.employee_key = o.o_clerk 
  INNER JOIN mystore.dim_customer AS c
    ON c.customer_key = o.o_custkey
)

In [0]:
import shutil
from pathlib import Path

dst_database = "mystore"
base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"
orders_output_silver = f"{database_dir}/fact_orders/silver"
checkpoint_location = f"{orders_output_silver}/_checkpoint"

# Delete existing checkpoint if it exists
try:
    dbutils.fs.rm(checkpoint_location, recurse=True)
    print(f"Deleted existing checkpoint at {checkpoint_location}")
except Exception as e:
    print(f"No existing checkpoint found or error deleting: {str(e)}")


query = spark.table("fact_orders_silver_tempview") \
    .writeStream \
    .format("delta") \
    .option("checkpointLocation", checkpoint_location) \
    .outputMode("append") \
    .table("fact_orders_silver")


query.awaitTermination()

Deleted existing checkpoint at dbfs:/FileStore/lab_data/northwind_dlh/fact_orders/silver/_checkpoint


In [0]:
%sql
SELECT * FROM fact_orders_silver

fact_order_key,order_key,employee_key,employee_last_name,employee_first_name,employee_job_title,employee_company,customer_key,customer_last_name,customer_first_name,order_date_key,order_status


Databricks data profile. Run in Databricks to view.

In [0]:
%sql
DESCRIBE EXTENDED mystore.fact_orders_silver

col_name,data_type,comment
fact_order_key,bigint,
order_key,bigint,
employee_key,bigint,
employee_last_name,string,
employee_first_name,string,
employee_job_title,string,
employee_company,string,
customer_key,bigint,
customer_last_name,string,
customer_first_name,string,


##### 6.3. Gold Table: Perform Aggregations
Create a new Gold table using the CTAS approach. The table should include the number of products sold per customer each Month, along with the Customers' ID, First & Last Name, and the Month in which the order was placed.

In [0]:
%sql
CREATE OR REPLACE TABLE mystore.fact_monthly_orders_by_customer_gold AS (
  SELECT customer_key AS CustomerID
    , customer_last_name AS LastName
    , customer_first_name AS FirstName
    , order_month_name AS OrderMonth
    , COUNT(product_key) AS ProductCount
  FROM mystore.fact_orders_silver
  GROUP BY CustomerID, LastName, FirstName, OrderMonth
  ORDER BY ProductCount DESC);

SELECT * FROM mystore.fact_monthly_orders_by_customer_gold;

CustomerID,LastName,FirstName,OrderMonth,ProductCount
26,Liu,Run,April,3
8,Andersen,Elizabeth,April,2
3,Axen,Thomas,April,2
4,Lee,Christina,April,1
9,Mortensen,Sven,June,1
29,Lee,Soo Jung,April,1
1,Bedecs,Anna,May,1
6,Pérez-Olaeta,Francisco,April,1
25,Rodman,John,April,1
6,Pérez-Olaeta,Francisco,June,1


In [0]:
%sql
CREATE OR REPLACE TABLE mystore.fact_product_orders_by_customer_gold AS (
  SELECT pc.CustomerID
    , os.customer_last_name AS CustomerName
    , os.product_key AS ProductNumber
    , pc.ProductCount
  FROM mystore.fact_orders_silver AS os
  INNER JOIN (
    SELECT customer_key AS CustomerID
    , COUNT(product_key) AS ProductCount
    FROM mystore.fact_orders_silver
    GROUP BY customer_key
  ) AS pc
  ON pc.CustomerID = os.customer_key
  ORDER BY ProductCount DESC);

SELECT * FROM mystore.fact_product_orders_by_customer_gold;

CustomerID,CustomerName,ProductNumber,ProductCount
26,Liu,5,4
26,Liu,41,4
26,Liu,40,4
26,Liu,6,4
28,Raghav,43,2
3,Axen,3,2
3,Axen,8,2
28,Raghav,40,2
6,Pérez-Olaeta,34,2
8,Andersen,8,2


#### 7.0. Use AutoLoader to Process Streaming (Hot Path) Purchase Orders Fact Data 
##### 7.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
# Use spark.readStream and the AutoLoader to read in the JSON files in the "purchase_orders_stream_dir"
# directory and then create a TempView named "purchase_orders_raw_tempview".
# Be sure to set the "cloudFiles.schemaLocation" Option using the "purchase_orders_output_bronze" directory

(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", purchase_orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(purchase_orders_stream_dir)
 .createOrReplaceTempView("purchase_orders_raw_tempview"))

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:431)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:458)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:537)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:507)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:611)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:631)
	at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)
	at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:271)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(Attr

In [0]:
%sql
/* Add Metadata for Traceability */

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
%sql
SELECT * FROM purchase_orders_bronze_tempview

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
(spark.table("purchase_orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{purchase_orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_purchase_orders_bronze"))

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

##### 7.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_purchase_orders_bronze")
  .createOrReplaceTempView("purchase_orders_silver_tempview"))

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
%sql
SELECT * FROM purchase_orders_silver_tempview

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
%sql
DESCRIBE EXTENDED purchase_orders_silver_tempview

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
%sql
-- Create a new Temporary View named "purchase_orders_silver_tempview" by selecting data from
-- "purchase_orders_silver_tempview" and joining it to the Supplier, Product, Employee and Date dimension tables.
-- Remember that the Date dimension can serve as a "Role Playing" dimension by being Joined upon multiple times.
CREATE OR REPLACE TEMPORARY VIEW fact_purchase_orders_silver_tempview AS (
  SELECT po.purchase_order_key,
      po.employee_key,
      e.last_name AS employee_last_name,
      e.first_name AS employee_first_name,
      po.supplier_key,
      s.company AS supplier_company,
      po.product_key,
      p.product_name,
      p.standard_cost,
      p.list_price,
      po.submitted_date_key,
      sd.month_name AS submitted_month,
      sd.calendar_quarter AS submitted_quarter,
      po.creation_date_key,
      cd.month_name AS creation_month,
      cd.calendar_quarter AS creation_quarter,
      po.status_key,
      po.expected_date_key,
      po.shipping_fee,
      po.taxes,
      po.payment_date_key,
      po.payment_amount,
      po.payment_method,
      po.notes
  FROM purchase_orders_silver_tempview AS po
  INNER JOIN northwind_dlh.dim_employee AS e
  ON e.employee_key = po.employee_key
  INNER JOIN northwind_dlh.dim_supplier AS s 
  ON s.supplier_key = po.supplier_key
  INNER JOIN northwind_dlh.dim_product AS p
  ON p.product_key = po.product_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS sd
  ON sd.date_key = po.submitted_date_key
  LEFT OUTER JOIN northwind_dlh.dim_date AS cd
  ON cd.date_key = po.creation_date_key
)


com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
(spark.table("fact_purchase_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{purchase_orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_purchase_orders_silver"))

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
%sql
SELECT * FROM fact_purchase_orders_silver

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
%sql
DESCRIBE EXTENDED fact_purchase_orders_silver

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

##### 7.3. Gold Table: Perform Aggregations
Create a new Gold table using the CTAS approach. The table should include the total amount (total list price) of the purchase orders placed per Supplier for each Product. Include the Suppliers' Company Name, and the Product Name.

In [0]:
%sql
-- Author a query that returns the Total List Price grouped by Supplier and Product and sorted by Total List Price descending.
CREATE OR REPLACE TABLE northwind_dlh.fact_inventory_transaction_summary_gold AS (
  SELECT created_quarter AS Quarter,
         transaction_type AS TransactionType,
         product_name AS Product,
         SUM(quantity) AS TotalQuantity
  FROM northwind_dlh.fact_inventory_transactions_silver
  GROUP BY Quarter, TransactionType, Product
  ORDER BY TotalQuantity DESC
);

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

#### 8.0. Use AutoLoader to Process Streaming (Hot Path) Inventory Transactions Fact Data 
##### 8.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
# Use spark.readStream and the AutoLoader to read in the JSON files in the "inventory_trans_stream_dir"
# directory and then create a TempView named "inventory_transactions_raw_tempview".
# Be sure to set the "cloudFiles.schemaLocation" Option using the "inventory_trans_output_bronze" directory

(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", inventory_trans_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(inventory_trans_stream_dir)
 .createOrReplaceTempView("inventory_transactions_raw_tempview"))

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
%sql
/* Add Metadata for Traceability */

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
%sql
SELECT * FROM inventory_transactions_raw_tempview

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
(spark.table("inventory_transactions_bronze_tempview")
     .writeStream
     .format("delta")
     .option("checkpointLocation", f"{inventory_trans_output_bronze}/_checkpoint")
     .outputMode("append")
     .table("fact_inventory_transactions_bronze"))

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

##### 8.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_inventory_transactions_bronze")
  .createOrReplaceTempView("inventory_transactions_silver_tempview"))

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
%sql
SELECT * FROM inventory_transactions_silver_tempview

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
%sql
DESCRIBE EXTENDED inventory_transactions_silver_tempview

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
%sql
-- Create a new Temporary View named "fact_inventory_transactions_silver_tempview" by selecting data from
-- "inventory_transactions_silver_tempview" and joining it to the Product and Data dimension tables.
-- Remember that the Date dimension can serve as a "Role Playing" dimension by being Joined upon multiple times.
CREATE OR REPLACE TEMPORARY VIEW fact_inventory_transactions_silver_tempview AS (
  SELECT 
      it.inventory_transaction_key,
      it.transaction_type,
      it.transaction_created_date_key,
      it.transaction_modified_date_key,
      it.product_key,
      it.quantity,
      it.comments,
      
      -- Product dimension attributes
      p.product_code,
      p.product_name,
      p.quantity_per_unit,
      p.discontinued,
      p.minimum_reorder_quantity,
      p.category,
      
      -- Created Date dimension attributes (role-playing)
      cd.date_key as created_date_key,
      cd.day_name_of_week as created_day_name_of_week,
      cd.calendar_year as created_year,
      
      md.date_key as modified_date_key,
      md.full_date_alternate_key as modified_full_date

  FROM inventory_transactions_silver_tempview it
  
  INNER JOIN mystore.dim_product p
  ON it.product_key = p.product_key
  
  LEFT OUTER JOIN  mystore.dim_date cd
  ON it.transaction_created_date_key = cd.date_key
  
  LEFT OUTER JOIN mystore.dim_date md
  ON it.transaction_modified_date_key = md.date_key
)

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
(spark.table("fact_inventory_transactions_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{inventory_trans_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_inventory_transactions_silver"))

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
%sql
SELECT * FROM fact_inventory_transactions_silver

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

In [0]:
%sql
DESCRIBE EXTENDED fact_inventory_transactions_silver

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

##### 8.3. Gold Table: Perform Aggregations

In [0]:
%sql
-- Author a query that returns the Total Quantity grouped by the Quarter Created, Inventory Transaction Type, and Product
-- Sort by the Total Quantity Descending
CREATE OR REPLACE TABLE mystore.fact_inventory_transaction_summary_gold AS (
  SELECT created_quarter AS Quarter,
         transaction_type AS TransactionType,
         product_name AS Product,
         SUM(quantity) AS TotalQuantity
  FROM mystore.fact_inventory_transactions_silver
  GROUP BY Quarter, TransactionType, Product
  ORDER BY TotalQuantity DESC
);

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou

#### 9.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/lab_data/

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=wna8fw-mysql.mysql.database.azure.com)(port=3306)(type=master) : (conn=6095) Access denied for user 'jtupitza'@'172.177.235.203' (using password: YES)
Current charset is UTF-8. If password has been set using other charset, consider using option 'passwordCharacterEncoding'
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)
	at org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:197)
	at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1404)
	at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)
	at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)
	at org.mariadb.jdbc.Driver.connect(Driver.java:89)
	at org.apache.spark.sql.execution.datasou