## Data Science 2002 Capstone- Anran Zhao (nrb6yu)

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "nrb6yu-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila"

connection_properties = {
  "user" : "azhao",
  "password" : "Passw0rd123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "sandbox.mvvgsd3"
atlas_database_name = "sakila"
atlas_user_name = "nrb6yu"
atlas_password = "Passw0rd1234"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/sakila"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/rentals"

orders_output_bronze = f"{database_dir}/fact_rentals/bronze"
orders_output_silver = f"{database_dir}/fact_rentals/silver"
orders_output_gold   = f"{database_dir}/fact_rentals/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_rentals", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

#### 3.0. Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Capstone"
LOCATION "dbfs:/FileStore/data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database. 

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://nrb6yu-mysql.mysql.database.azure.com:3306/northwind_dw2",
  dbtable "dim_date",
  user "azhao",   
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

##### 1.3. Create a New Table that Sources Inventory Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_inventory" that extracts data from your MySQL Sakila database.
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apche.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://nrb6yu-mysql.mysql.database.azure.com:3306/northwind_dw2",
  dbtable "dim_inventory",
  user "azhao"
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_product" using data from the view named "view_inventory"
CREATE OR REPLACE TABLE northwind_dlh.dim_inventory
COMMENT "Inventory Dimension Table"
LOCATION "dbfs:/FileStore/data/sakila_dlh/dim_inventory"
AS SELECT * FROM view_product

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_product;

In [0]:
%sql
SELECT * FROM sakila_dlh.dim_product LIMIT 5

#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))  # '/dbfs/FileStore/data/sakila/batch'

##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/data/sakila/batch'
json_files = {"inventory" : 'inventory.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

##### 2.3.1. Fetch Inventory Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "nrb6yu"
val pwd = "Passw0rd1234"
val clusterName = "sandbox.mvvgsd3"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_inventory = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila")
.option("collection", "inventory").load()
.select("inventory_key","inventory_id","film_id","store_id")

display(df_inventory)

In [0]:
%scala
df_inventory.printSchema()

##### 2.3.2. Use the Spark DataFrame to Create a New Inventory Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_inventory")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory

In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5

#### 3.0. Fetch Staff Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
staff_csv = f"{batch_dir}/staff.csv"

df_staff = spark.read.format('csv').options(header='true', inferSchema='true').load(staff_csv)
display(df_staff)

In [0]:
df_staff.printSchema()

In [0]:
df_staff.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_staff")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_staff;

In [0]:
%sql
SELECT * FROM sakila_dlh.dim_staff LIMIT 5;

##### Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rentals_stream_dir)
 .createOrReplaceTempView("rentals_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

In [0]:
(spark.table("rentals_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT o.fact_rental_key,
    r.rental_id,
    r.rental_date_key,
    rd.day_name_of_week AS rental_day_name_of_week,
    rd.day_of_month AS rental_day_of_month,
    rd.weekday_weekend AS rental_weekday_weekend,
    rd.month_name AS rental_month_name,
    rd.calendar_quarter AS rental_quarter,
    rd.calendar_year AS rental_year,
    r.return_date_key,
    red.day_name_of_week AS return_day_name_of_week,
    red.day_of_month AS return_day_of_month,
    red.weekday_weekend AS return_weekday_weekend,
    red.month_name AS return_month_name,
    red.calendar_quarter AS return_quarter,
    red.calendar_year AS return_year,
    r.inventory_key
    i.store_id AS inventory_store_id,
    i.film_id AS inventory_film_id
    r.customer_key,
    c.first_name AS customer_first_name,
    c.last_name AS customer_last_name,
    c.email AS customer_email,
    r.staff_key,
    s.first_name AS staff_first_name,
    s.last_name AS staff_last_name,
    s.email AS staff_email,
    s.username AS staff_username,
    s.password AS staff_password,
  FROM rentals_silver_tempview AS r
  LEFT OUTER JOIN sakila_dlh.dim_date AS rd
  ON rd.date_key = r.rental_date_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS red
  ON red.date_key = r.return_date_key
  INNER JOIN sakila_dlh.inventory AS i
  ON i.inventory_key = r.inventory_key
  INNER JOIN sakila_dlh.customer AS c
  ON c.customer_key = r.customer_key
  INNER JOIN sakila_dlh.staff AS s
  ON s.staff_key = r.staff_key
)

In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))

In [0]:
%sql
SELECT * FROM fact_rentals_silver

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rentals_silver

##### 6.3. Gold Table: Perform Aggregations

In [0]:
%sql
-- Author a query that returns the average number of rentals per year
CREATE OR REPLACE TABLE sakila_dlh.fact_rentals_gold AS (
    SELECT 
        rd.calendar_year AS Year,
        COUNT(r.rental_id) AS TotalRentals,
        AVG(COUNT(r.rental_id)) OVER () AS AverageRentalsPerYear
    FROM 
        sakila_dlh.fact_rentals_silver AS r
    JOIN 
        sakila_dlh.dim_date AS rd ON rd.date_key = r.rental_date_key
    GROUP BY 
        rd.calendar_year
    ORDER BY 
        Year DESC
);

SELECT * FROM sakila_dlh.fact_rentals_gold;


#### 7.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/lab_data/