## Final Project
### Section 1: Prerequisites
#### 1.0: Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0: Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ds2002-mysql-bsy6pq.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "bsy6pq",
  "password" : "Passw0rd123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "ds2002.ixnz0au"
atlas_database_name = "sakila_dw"
atlas_user_name = "bsy6pq"
atlas_password = "Passw0rd123"

# Data Files Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/ds2002-final-project"
database_dir = f"{base_dir}/{dst_database}"
json_dir = f"{base_dir}/json"
csv_dir = f"{base_dir}/csv"
stream_dir = f"{base_dir}/stream"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

#### 3.0: Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002: Final Project Database"
LOCATION "dbfs:/FileStore/ds2002-final-project/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002: Final Project")

##### 1.2: Create a New Table that Sources Date Dimension Data from an Azure MySQL Database

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-bsy6pq.mysql.database.azure.com:3306/sakila_dw",
  dbtable "dim_date",
  user "bsy6pq",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-final-project/sakila_dlh/dim_date"
AS SELECT * FROM view_date

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

##### 1.3: Create a New Table that Sources Rental Dimension Data from an Azure MySQL Database

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_rental
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-mysql-bsy6pq.mysql.database.azure.com:3306/sakila_dw",
  dbtable "dim_rentals",
  user "bsy6pq",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_rental
COMMENT "Rental Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-final-project/sakila_dlh/dim_rental"
AS SELECT * FROM view_rental

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_rental

In [0]:
%sql
SELECT * FROM sakila_dlh.dim_rental LIMIT 5

#### 2.0: Fetch Reference Data from a MongoDB Atlas Database
##### 2.1: View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(json_dir))

##### 2.2: Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/ds2002-final-project/json'
json_file = {"payment" : "payment_p2007_01.json"}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_file)

##### 2.3.1: Fetch Payment Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_payment = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila_dw").option("collection", "payment").load()
.select("payment_id", "customer_id", "amount", "payment_date")

display(df_actor)

In [0]:
%scala
df_payment.printSchema()

##### 2.3.2: Use the Spark DataFrame to Create a New Payment Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_payment.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_payment")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_payment

In [0]:
%sql
SELECT * FROM sakila_dlh.dim_payment LIMIT 5

#### 3.0: Fetch Data from a File System
##### 3.1: Use PySpark to Read City Dimension Data From CSV File

In [0]:
city_csv = f"{csv_dir}/city.csv"

df_city = spark.read.format('csv').options(header='true', inferSchema='true').load(city_csv)
display(df_city)

In [0]:
df_city.printSchema()

In [0]:
df_city.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_city")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_city;

##### 3.4: Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

#### 4.0: Use AutoLoader to Process Orders Fact Data
##### 4.1: Bronze Table - Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_order_key")
 .option("cloudFiles.schemaHints", "customer")
 .option("cloudFiles.schemaHints", "store")
 .option("cloudFiles.schemaHints", "address")
 .option("cloudFiles.schemaHints", "city")
 .option("cloudFiles.schemaHints", "customer_last_name")
 .option("cloudFiles.schemaHints", "customer_first_name")
 .option("cloudFiles.schemaHints", "customer_email")
 .option("cloudFiles.schemaHints", "district")
 .option("cloudFiles.schemaHints", "postal_code")
 .option("cloudFiles.schemaHints", "phone_number")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("orders_raw_tempview")
 )

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

##### 4.2: Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.fact_order_key,
  , o.customer
  , o.store
  , o.address
  , o.city
  , o.customer_last_name
  , o.customer_first_name
  , o.customer_email
  , o.district
  , o.postal_code
  , o.phone_number
  , r.rental_key
  , r.inventory
  , r.rental_date
  , r.return_date
  , p.payment_id
  , p.amount
  , p.payment_date
  , c.city_id
  , c.city
  FROM orders_silver_tempview AS o
  INNER JOIN sakila_dlh.dim_rental AS r
  ON r.customer = o.customer
  INNER JOIN sakila_dlh.dim_payment AS p
  ON p.customer_id = o.customer
  INNER JOIN sakila_dlh.dim_city AS c
  ON c.address_id = o.address
)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation")
      .outputMode("append")
      .table("fact_orders_silver"))

In [0]:
%sql
SELECT * FROM fact_orders_silver

In [0]:
%sql
DESCRIBE EXTENDED fact_orders_silver

##### 8.3: Gold Table: Perform Aggregations

In [0]:
%sql
SELECT customer
    , customer_last_name
    , customer_first_name
    , customer_email
    , SUM(amount) AS total_amount
    , address
    , city
    , district
    , postal_code
    , payment_date
FROM sakila_dlh.fact_inventory_silver
GROUP BY district
ORDER BY total_amount DESC