## Final Project DS 2002
#### Tuan-Minh Tran

Data being used from sakila database

### 1.0. Import Required Libraries

In [None]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

### Instantiate Global Variables

In [None]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "bat3pf.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila" ## i used the sakila database

connection_properties = {
  "user" : "bat3pf",
  "password" : "Okmijn1u!321",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.qazws"
atlas_database_name = "sakila"
atlas_user_name = "bat3pf"
atlas_password = "Okmijn1u!321"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/final_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rentals_stream_dir = f"{stream_dir}/rentals"

rentals_output_bronze = f"{database_dir}/fact_rentals/bronze"
rentals_output_silver = f"{database_dir}/fact_rentals/silver"
rentals_output_gold   = f"{database_dir}/fact_rentals/gold"

#dbutils.fs.rm(f"{database_dir}/fact_rentals", True)
#dbutils.fs.rm(database_dir, True)

In [None]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"

    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()

    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]

    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()

    return result

### Populate Dimensions by Ingesting Reference (Cold-path) Data
#### Fetch Reference Data From an Azure MySQL Database
##### Create a New Databricks Metadata Database.

In [None]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Final Project Database"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final Project");

### Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://bat3pf.mysql.database.azure.com:3306/sakila",
  dbtable "dim_date",
  user "bat3pf",
  password "Okmijn1u!321"
)

In [None]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [None]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_inventory
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://bat3pf.mysql.database.azure.com:3306/sakila",
  dbtable "inventory",
  user "bat3pf",
  password "Okmijn1u!321"
)

In [None]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.inventory
COMMENT "Payment Table"
COMMENT "Inventory Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh/inventory"
AS SELECT * FROM view_inventory

num_affected_rows,num_inserted_rows


In [None]:
%sql
SELECT * FROM sakila_dlh.inventory LIMIT 5

inventory_id,film_id,store_id
1,1,1
2,1,1
3,1,1
4,1,1
5,1,2


In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://bat3pf.mysql.database.azure.com:3306/sakila",
  dbtable "customer",
  user "bat3pf",
  password "Okmijn1u!321"
)

In [None]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.customer
COMMENT "Customers Table"
COMMENT "Inventory Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh/inventory"
AS SELECT * FROM view_customer

In [None]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [None]:
%sql
SELECT * FROM sakila_dlh.customer LIMIT 5

#### Create a New Table that Fetches Data from MongoDB Atlas Database

In [None]:
display(dbutils.fs.ls(batch_dir))

In [None]:
source_dir = '/dbfs/FileStore/lab_data/final_data/batch'
json_files = {"film" : 'sakila_film.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name,
                     atlas_database_name, source_dir, json_files)

Out[16]: <pymongo.results.InsertManyResult at 0x7fbeb073b980>

In [None]:
%scala
import com.mongodb.spark._

val userName = "bat3pf"
val pwd = "Okmijn1u!321"
val clusterName = "cluster0.qazws"
val atlas_uri = "mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [None]:
%scala
import com.mongodb.spark._

val df_film= spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila").option("collection", "film").option("uri", atlas_uri).load()

#display(df_film)

customer_id,store_id,last_name,first_name,active
1,1,SMITH,MARY,1
2,1,JOHNSON,PATRICIA,1
3,1,WILLIAMS,LINDA,1
4,2,JONES,BARBARA,1
5,1,BROWN,ELIZABETH,1
6,2,DAVIS,JENNIFER,1
7,1,MILLER,MARIA,1
8,2,WILSON,SUSAN,1
9,2,MOORE,MARGARET,1
10,1,TAYLOR,DOROTHY,1


In [None]:
%scala
df_customer.printSchema()

In [None]:
%scala
df_film.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.film")

In [None]:
%sql
SELECT * FROM sakila_dlh.film LIMIT 5

### Fetch Data from a File System - Using PySpark to read info from a CSV

In [None]:
#fetch the rentals table

rental_csv = f"{batch_dir}/sakila_rental.csv"

df_rental = spark.read.format('csv').options(header = 'true', inferSchema = 'true').load(rental_csv)
display(df_rental)

In [None]:
%scala
df_rental.printSchema()

In [None]:
df_rental.write.format('delta').mode('overwrite').saveAsTable('sakila_dlh.rental')

In [None]:
%sql
SELECT * FROM sakila_dlh.rental LIMIT 5;

In [None]:
df_rental.write.format('delta').mode('overwrite').saveAsTable('sakila_dlh.rental')

In [None]:
%sql
SELECT * FROM sakila_dlh.rental LIMIT 5;

In [None]:
#fetch the fact orders table

fact_orders_csv = f"{batch_dir}/sakila_fact_table.csv"

df_fact_orders = spark.read.format('csv').options(header = 'true', inferSchema = 'true').load(fact_orders_csv)
display(df_fact_orders)

In [None]:
df_fact_orders.write.format('delta').mode('overwrite').saveAsTable('sakila_dlh.fact_orders')

In [None]:
%sql
SELECT * FROM sakila_dlh.rental LIMIT 5;

payment_id,customer_id,rental_id,amount,payment_date
1,1,76,2.99,2005-05-25 11:30:37
2,1,573,0.99,2005-05-28 10:35:23
3,1,1185,5.99,2005-06-15 00:54:12
4,1,1422,0.99,2005-06-15 18:02:53
5,1,1476,9.99,2005-06-15 21:08:46


Verify the tables exist:



In [None]:
%sql
USE sakila_dlh;
SHOW TABLES

### Integrate Reference Data with Real-Time Data
using sakila.payment as the current streaming table

bronze table

In [None]:
%scala
df_rental.printSchema()

In [None]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "payment_id ")
 .option("cloudFiles.schemaHints", "customer_id BIGINT")
 .option("cloudFiles.schemaHints", "staff_id BIGINT")
 .option("cloudFiles.schemaHints", "rental BIGINT")
 .option("cloudFiles.schemaHints", "amount DECIMAL")
 .option("cloudFiles.schemaLocation", payment_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(payment_stream_dir)
 .createOrReplaceTempView("payment_raw_tempview"))

In [None]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW payment_output_bronze AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM payment_raw_tempview
)

In [None]:
%sql
SELECT * FROM payment_output_bronze LIMIT 2

In [None]:
(spark.table("payment_output_bronze")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{payment_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

In [None]:
%sql
SELECT * FROM fact_orders_bronze LIMIT 2

silver table

In [None]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("fact_orders_silver_tempview"))

In [None]:
%sql
SELECT * FROM fact_orders_silver_tempview LIMIT 2;

In [None]:
%sql
-- joining together
CREATE OR REPLACE TEMPORARY VIEW cust_pay AS(
SELECT c.customer_id,
c.store_id,
c.first_name,
c.last_name,
c.email,
--payment
p.payment_id,
p.amount,
p.last_update,
p.rental_id,
p.staff_id,
p.source_file,
--rental
r.inventory_id,
-- inventory
i.film_id,
--film
f.description,
f.length,
f.rating,
f.release_year,
f.title,
f.rental_rate
FROM sakila_dlh.customer AS c
JOIN fact_orders_silver_tempview AS p
ON c.customer_id = p.customer_id
JOIN sakila_dlh.rental AS r
ON r.rental_id = p.rental_id
JOIN sakila_dlh.inventory AS i
ON r.inventory_id = i.inventory_id
JOIN sakila_dlh.film AS f
ON f.film_id = i.film_id
)

In [None]:
%sql
SELECT * FROM cust_pay LIMIT 2

In [None]:
(spark.table("cust_pay")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{payment_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_inventory_transactions_silver"))

In [None]:
%sql
SELECT * FROM fact_inventory_transactions_silver LIMIT 2

gold table - data table aggregations

In [None]:
%sql
SELECT release_year AS movie_release_year
    , rating AS movie_rating
    , AVG(rental_rate) AS average_rental_rate
FROM sakila_dlh.fact_inventory_transactions_silver
GROUP BY movie_release_year, movie_rating
ORDER BY movie_release_year DESC