### 1. Prerequisites

#### 1.1 Import Libraries

In [None]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 1.2 Instantiate Global Variables

In [None]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "dorothyphilip-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "grocery_inventory_db"

connection_properties = {
  "user" : "root",
  "password" : "Wishbone1",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.ab9ci"
atlas_database_name = "grocerydb_customers"
atlas_user_name = "dorothyphilip"
atlas_password = "Wishbone1"

# Data Files (JSON) Information ###############################
dst_database = "grocerydb"

base_dir = "dbfs:/FileStore/project_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/grocery"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}"
orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

#### 1.3 Define Global Functions

In [None]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### 2. Populate Dimensions by Ingesting Reference (Cold-path) Data 

#### 2.1 Fetch Reference Data From an Azure MySQL Database

##### 2.1.1. Create a New Databricks Metadata Database.

In [None]:
%sql
DROP DATABASE IF EXISTS grocery_dlh CASCADE;

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS grocery_dlh
COMMENT "DS-2002 Project 2 Database"
LOCATION "dbfs:/FileStore/project_data/grocery_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone Project");

##### 2.1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://dorothyphilip-mysql.mysql.database.azure.com:3306/grocerydb", --Replace with your Server Name
  dbtable "dim_date",
  user "dorothyphilip",    --Replace with your User Name
  password "Wishbone1"  --Replace with you password
)

In [None]:
%sql
USE DATABASE grocery_dlh;

CREATE OR REPLACE TABLE grocery_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/project_data/grocery_dlh/dim_date"
AS SELECT * FROM view_date

In [None]:
%sql
DESCRIBE EXTENDED grocery_dlh.dim_date;

In [None]:
%sql
SELECT * FROM grocery_dlh.dim_date LIMIT 5

##### 2.1.3. Create a New Table that Sources Supplier Dimension Data from an Azure MySQL database.

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_supplier
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://dorothyphilip-mysql.mysql.database.azure.com:3306/grocerydb", --Replace with your Server Name
  dbtable "dim_supplier",
  user "dorothyphilip",    --Replace with your User Name
  password "Wishbone1"  --Replace with you password
)

In [None]:
%sql
USE DATABASE grocery_dlh;

CREATE OR REPLACE TABLE grocery_dlh.dim_supplier
COMMENT "Grocery Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/grocery_dlh/dim_supplier"
AS SELECT * FROM view_supplier

In [None]:
%sql
DESCRIBE EXTENDED grocery_dlh.dim_supplier;

In [None]:
%sql
SELECT * FROM grocery_dlh.dim_supplier LIMIT 5

#### 2.2 Fetch Reference Data from a MongoDB Atlas Database

##### 2.2.1 View the Data Files on the Databricks File System

In [None]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/lab_data/retail/batch'

##### 2.2.2 Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [None]:
source_dir = '/dbfs/FileStore/project_data/grocery/batch'
json_files = {"customers" : 'dim_customer.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

##### 2.2.3 Fetch Customer Dimension Data from the New MongoDB Collection

In [None]:
%scala
import com.mongodb.spark._

val userName = "dorothyphilip"
val pwd = "Wishbone1"
val clusterName = "cluster0.ab9ci"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [None]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "grocerydb_customers")
.option("collection", "customers").load()
.select("customer_key","customer_id","first_name","last_name","birthdate","sex","street","city","zip","country","email","phone")

display(df_customer)

In [None]:
%scala
df_customer.printSchema()

##### 2.2.4. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database

In [None]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("grocery_dlh.dim_customer")

In [None]:
%sql
DESCRIBE EXTENDED grocery_dlh.dim_customer

#### 2.3 Fetch Data from a File System

##### 2.3.1 Use PySpark to Read Grocery Data From a JSON File

In [None]:
grocery_json = f"{batch_dir}/dim_grocery.json"

df_grocery = spark.read.option("multiline","true").json(grocery_json)
display(df_grocery)

In [None]:
df_grocery.printSchema()

In [None]:
df_grocery.write.format("delta").mode("overwrite").saveAsTable("grocery_dlh.dim_grocery")

In [None]:
%sql
DESCRIBE EXTENDED grocery_dlh.dim_grocery;

In [None]:
%sql
SELECT * FROM grocery_dlh.dim_grocery LIMIT 5;

##### 2.3.2 Use PySpark to Read Product Dimension Data from CSV File

In [None]:
product_csv = f"{batch_dir}/dim_product.csv"

df_product = spark.read.format('csv').options(sep = ';', header='true', inferSchema='true').load(product_csv)
display(df_product)

In [None]:
df_product.printSchema()

In [None]:
df_product.write.format("delta").mode("overwrite").saveAsTable("grocery_dlh.dim_product")

In [None]:
%sql
DESCRIBE EXTENDED grocery_dlh.dim_product;

In [None]:
%sql
SELECT * FROM grocery_dlh.dim_product LIMIT 5;

#### 2.4 Verify Dimension Tables

In [None]:
%sql
USE grocery_dlh;
SHOW TABLES

### 3. Integrate Reference Data with Real-Time Data

#### 3.1. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 

##### 3.1.1. Bronze Table: Process 'Raw' JSON Data

In [None]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [None]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [None]:
%sql
SELECT * FROM orders_bronze_tempview

In [None]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

#### 3.2. Silver Table: Include Reference Data

In [None]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [None]:
%sql
SELECT * FROM orders_silver_tempview

In [None]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT b.fact_order_key,
      b.order_id,
      b.product_key,
      f.product_num,
      f.departure_time,
      f.departure_date_key,
      dd.day_name_of_week AS departure_day_name_of_week,
      dd.day_of_month AS departure_day_of_month,
      dd.weekday_weekend AS departure_weekday_weekend,
      dd.month_name AS departure_month_name,
      dd.calendar_quarter AS departure_quarter,
      dd.calendar_year AS departure_year,
      f.arrival_time,
      f.arrival_date_key,
      ad.day_name_of_week AS arrival_day_name_of_week,
      ad.day_of_month AS arrival_day_of_month,
      ad.weekday_weekend AS arrival_weekday_weekend,
      ad.month_name AS arrival_month_name,
      ad.calendar_quarter AS arrival_quarter,
      ad.calendar_year AS arrival_year,
      f.supplier_key,
      al.supplier_name,
      f.from_key,
      fa.name AS from_grocery_name,
      fa.city AS from_grocery_city,
      fa.country AS from_grocery_country,
      f.to_key,
      ta.name AS to_grocery_name,
      ta.city AS to_grocery_city,
      ta.country AS to_grocery_country,
      b.customer_key,
      p.first_name AS customer_first_name,
      p.last_name AS customer_last_name,
      p.sex AS customer_sex,
      p.country AS customer_country,
      b.price,
      b.seat
  FROM orders_silver_tempview AS b
  INNER JOIN grocery_dlh.dim_product AS f
  ON b.product_key = f.product_key
  INNER JOIN grocery_dlh.dim_supplier AS al
  ON f.supplier_key = al.supplier_key
  INNER JOIN grocery_dlh.dim_grocery AS fa
  ON f.from_key = fa.grocery_key
  INNER JOIN grocery_dlh.dim_grocery AS ta
  ON f.to_key = ta.grocery_key
  INNER JOIN grocery_dlh.dim_customer AS p
  ON b.customer_key = p.customer_key
  LEFT OUTER JOIN grocery_dlh.dim_date AS dd
  ON dd.date_key = f.departure_date_key
  LEFT OUTER JOIN grocery_dlh.dim_date AS ad
  ON ad.date_key = f.arrival_date_key
)

In [None]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

In [None]:
%sql
SELECT * FROM fact_orders_silver

In [None]:
%sql
DESCRIBE EXTENDED grocery_dlh.fact_orders_silver

#### 3.3. Gold Table: Perform Aggregations

In [None]:
%sql
CREATE OR REPLACE TABLE grocery_dlh.fact_orders_by_supplier_and_destination_country AS (
  SELECT to_grocery_country AS DestinationCountry
    , supplier_name AS Supplier
    , COUNT(fact_order_key) AS OrderCount
  FROM grocery_dlh.fact_orders_silver
  GROUP BY DestinationCountry, Supplier
  ORDER BY OrderCount DESC);

SELECT * FROM grocery_dlh.fact_orders_by_supplier_and_destination_country;

In [None]:
%sql
CREATE OR REPLACE TABLE grocery_dlh.fact_orders_revenue_per_supplier_by_americans AS (
  SELECT supplier_name AS Supplier
      ,SUM(price) AS Revenue
  FROM grocery_dlh.fact_orders_silver
  WHERE customer_country = 'United States'
  GROUP BY Supplier
  ORDER BY Revenue DESC);

SELECT * FROM grocery_dlh.fact_orders_revenue_per_supplier_by_americans;

In [None]:
%sql
CREATE OR REPLACE TABLE grocery_dlh.fact_orders_revenue_by_sex_and_departure_day AS (
  SELECT customer_sex AS Sex
      , departure_day_name_of_week AS DayOfWeek
      , SUM(price) AS Revenue
  FROM grocery_dlh.fact_orders_silver
  GROUP BY Sex, DayOfWeek
  ORDER BY Revenue DESC);

SELECT * FROM grocery_dlh.fact_orders_revenue_by_sex_and_departure_day;

#### 4. Clean up the File System

In [None]:
%fs rm -r /FileStore/project_data/