###Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

###Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "shk5yd-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila"

connection_properties = {
  "user" : "hannahabele",
  "password" : "Jennifer123!@#",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "hannahacluster.gjgoj"
atlas_database_name = "sakila"
atlas_user_name = "hcabele"
atlas_password = "Claire123"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore/ds2002-final"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/data"
batch_dir = f"{base_dir}/data/batch"
stream_dir = f"{base_dir}/data/stream"

output_bronze = f"{database_dir}/fact_sales/bronze"
output_silver = f"{database_dir}/fact_sales/silver"
output_gold   = f"{database_dir}/fact_sales/gold"


# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_sales", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

###Define Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

###USE THIS FUNCTION
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the AZURE MySQL Database'''
    jdbcUrl = f"jdbc:mysql://{host_name}:{port}/{db_name}"

    '''Invoke the spark.read.jdbc() function to query the database, and fill a Pandas DataFrame.'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)
    
    return dframe

### Populate Dimensions
#### Create New Databricks Metadatabase

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Final Database"
LOCATION "dbfs:/FileStore/ds2002-final/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Final");

#### Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://shk5yd-mysql.mysql.database.azure.com:3306/sakila_dw", --Replace with your Server Name
  dbtable "dim_date",
  user "hannahabele",    --Replace with your User Name
  password "Jennifer123!@#"  --Replace with you password
)

####Getting dim_date from DBFS folder

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-final/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


#####Ensuring the Table was Created by Selecting all and Describing Extended

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,tinyint,
day_name_of_week,varchar(10),
day_of_month,tinyint,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


####Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- Create a Temporary View named "view_customer" that extracts data from your MySQL Northwind database.
CREATE OR REPLACE TEMPORARY VIEW view_customer
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://shk5yd-mysql.mysql.database.azure.com:3306/sakila_dw", --Replace with your Server Name
  dbtable "dim_customer",
  user "hannahabele",    
  password "Jennifer123!@#"  )

#####Getting Customer Dimension Table from MySQL

In [0]:
%sql
USE DATABASE sakila_dlh;

-- Create a new table named "sakila_dlh.dim_customer" using data from the view named "view_customer"

CREATE OR REPLACE TABLE sakila_dlh.dim_customer
COMMENT "Customer Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-final/sakila_dlh/dim_customer"
AS SELECT * FROM view_customer

num_affected_rows,num_inserted_rows


#####Ensuring the Table was Created by Selecting all and Describing Extended

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer;

col_name,data_type,comment
customer_key,bigint,
customer_id,bigint,
store_id,bigint,
first_name,varchar(65535),
last_name,varchar(65535),
email,varchar(65535),
address_id,bigint,
create_date,timestamp,
,,
# Delta Statistics Columns,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_key,customer_id,store_id,first_name,last_name,email,address_id,create_date
1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,2006-02-14T22:04:36Z
2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,2006-02-14T22:04:36Z
3,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,2006-02-14T22:04:36Z
4,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,2006-02-14T22:04:36Z
5,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,2006-02-14T22:04:36Z


###Fetch Reference Data from a MongoDB Atlas Database
####View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir)) 

path,name,size,modificationTime
dbfs:/FileStore/ds2002-final/data/batch/sakila_store.csv,sakila_store.csv,101,1733412182000


####Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection

In [0]:
source_dir = '/dbfs/FileStore/ds2002-final/'
json_files = {"payment" : 'sakila_payment.json'
              }

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files)

<pymongo.results.InsertManyResult at 0x7f02d2757ec0>

####Fetch Payment Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val userName = "hcabele"
val pwd = "Claire123"
val clusterName = "hannahacluster.gjgoj"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala

val df_payment = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila")
.option("collection", "payment").load()
.select("payment_id","customer_id","staff_id", "rental_id", "amount", "payment_date")

display(df_payment)

payment_id,customer_id,staff_id,rental_id,amount,payment_date
1,1,1,76,2.99,2005-05-25 11:30:37
2,1,1,573,0.99,2005-05-28 10:35:23
3,1,1,1185,5.99,2005-06-15 00:54:12
4,1,2,1422,0.99,2005-06-15 18:02:53
5,1,2,1476,9.99,2005-06-15 21:08:46
6,1,1,1725,4.99,2005-06-16 15:18:57
7,1,1,2308,4.99,2005-06-18 08:41:48
8,1,2,2363,0.99,2005-06-18 13:33:59
9,1,1,3284,3.99,2005-06-21 06:24:45
10,1,2,4526,5.99,2005-07-08 03:17:05


In [0]:
%scala
df_payment.printSchema()

####Use the Spark DataFrame to Create a New Payment Dimension Table in the Databricks Metadata Database

In [0]:
%scala
df_payment.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_payment")

#####Ensuring the Table was Created by Selecting all and Describing Extended

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_payment

col_name,data_type,comment
payment_id,int,
customer_id,int,
staff_id,int,
rental_id,int,
amount,double,
payment_date,string,
,,
# Delta Statistics Columns,,
Column Names,"customer_id, rental_id, payment_date, amount, payment_id, staff_id",
Column Selection Method,first-32,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_payment LIMIT 5

payment_id,customer_id,staff_id,rental_id,amount,payment_date
1,1,1,76,2.99,2005-05-25 11:30:37
2,1,1,573,0.99,2005-05-28 10:35:23
3,1,1,1185,5.99,2005-06-15 00:54:12
4,1,2,1422,0.99,2005-06-15 18:02:53
5,1,2,1476,9.99,2005-06-15 21:08:46


###Fetch Data from a File System
####Use PySpark to Read From a CSV File in DBFS

In [0]:
store_csv = f"{batch_dir}/sakila_store.csv"

df_store = spark.read.format('csv').options(header='true', inferSchema='true').load(store_csv)
display(df_store)

store_id,manager_staff_id,address_id,last_update
1,1,1,2006-02-15T04:57:12Z
2,2,2,2006-02-15T04:57:12Z


In [0]:
df_store.printSchema()

root
 |-- store_id: integer (nullable = true)
 |-- manager_staff_id: integer (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- last_update: timestamp (nullable = true)



####Saving the Table into sakila_dlh

In [0]:
df_store.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_store")

#####Ensuring the Table was Created by Selecting all and Describing Extended

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_store;

col_name,data_type,comment
store_id,int,
manager_staff_id,int,
address_id,int,
last_update,timestamp,
,,
# Delta Statistics Columns,,
Column Names,"store_id, manager_staff_id, address_id, last_update",
Column Selection Method,first-32,
,,
# Detailed Table Information,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_store LIMIT 5;

store_id,manager_staff_id,address_id,last_update
1,1,1,2006-02-15T04:57:12Z
2,2,2,2006-02-15T04:57:12Z


###Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_payment,False
sakila_dlh,dim_store,False
,_sqldf,True
,view_customer,True
,view_date,True


##Section III: Integrate Reference Data with Real-Time Data
###Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data
####Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 #.option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 #.option("cloudFiles.schemaHints", "customer_key BIGINT")
 #.option("cloudFiles.schemaHints", "payment_key BIGINT")
 #.option("cloudFiles.schemaHints", "store_key BIGINT")
 #.option("cloudFiles.schemaHints", "store_id DECIMAL")
 #.option("cloudFiles.schemaHints", "amount DECIMAL")
 #.option("cloudFiles.schemaHints", "first_name STRING")
 #.option("cloudFiles.schemaHints", "last_name STRING")
 #.option("cloudFiles.schemaHints", "email STRING")
 #.option("cloudFiles.schemaHints", "address_id STRING")
 .option("cloudFiles.schemaLocation", output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

#####Adding Metadata to Tempview

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

##### Using Spark writeStream to Create fact_orders_bronze Table

In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f02a830a550>

####Silver Table
#####Using Spark to ReadStream from the Bronze Table

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

#####Ensuring the Table was Created by Selecting all and Describing Extended

In [0]:
%sql
SELECT * FROM orders_silver_tempview

In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
address_id,bigint,
amount,double,
create_date,string,
customer_id,bigint,
customer_key,bigint,
email,string,
first_name,string,
last_name,string,
last_update,string,
manager_staff_id,bigint,


####Creating a new Tempview for the Fact Orders Table and Joining on orders_silver tempview, dim_customer, dim_store, and dim_date

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT 
      p.payment_id,
      p.customer_id,
      c.first_name AS customer_first_name,
      c.last_name AS customer_last_name,
      c.store_id,
      s.manager_staff_id AS store_manager,
      p.amount AS payment_amount,
      p.payment_date AS payment_date,
      d.day_name_of_week AS payment_day_name_of_week,
      d.day_of_month AS payment_day_of_month,
      d.weekday_weekend AS payment_weekday_weekend,
      d.month_name AS payment_month_name,
      d.calendar_quarter AS payment_calendar_quarter,
      d.calendar_year AS payment_calendar_year
  FROM orders_silver_tempview AS p
  INNER JOIN sakila_dlh.dim_customer AS c
      ON p.customer_id = c.customer_id
  INNER JOIN sakila_dlh.dim_store AS s
      ON c.store_id = s.store_id
  LEFT OUTER JOIN sakila_dlh.dim_date AS d
      ON DATE(p.payment_date) = d.date_name
)

#####Using Spark writeStream to Create the fact_orders silver table

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f02d405b490>

#####Ensuring the Table was Created by Selecting all and Describing Extended

In [0]:
%sql
SELECT * FROM fact_orders_silver

payment_id,customer_id,customer_first_name,customer_last_name,store_id,store_manager,payment_amount,payment_date,payment_day_name_of_week,payment_day_of_month,payment_weekday_weekend,payment_month_name,payment_calendar_quarter,payment_calendar_year
1,1,MARY,SMITH,1,1,2.99,2005-05-25 11:30:37,,,,,,
2,1,MARY,SMITH,1,1,0.99,2005-05-28 10:35:23,,,,,,
3,1,MARY,SMITH,1,1,5.99,2005-06-15 00:54:12,,,,,,
4,1,MARY,SMITH,1,1,0.99,2005-06-15 18:02:53,,,,,,
5,1,MARY,SMITH,1,1,9.99,2005-06-15 21:08:46,,,,,,
6,1,MARY,SMITH,1,1,4.99,2005-06-16 15:18:57,,,,,,
7,1,MARY,SMITH,1,1,4.99,2005-06-18 08:41:48,,,,,,
8,1,MARY,SMITH,1,1,0.99,2005-06-18 13:33:59,,,,,,
9,1,MARY,SMITH,1,1,3.99,2005-06-21 06:24:45,,,,,,
10,1,MARY,SMITH,1,1,5.99,2005-07-08 03:17:05,,,,,,


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_orders_silver

col_name,data_type,comment
payment_id,bigint,
customer_id,bigint,
customer_first_name,varchar(65535),
customer_last_name,varchar(65535),
store_id,bigint,
store_manager,int,
payment_amount,double,
payment_date,string,
payment_day_name_of_week,varchar(10),
payment_day_of_month,tinyint,


###Gold Table: Perform Aggregations
####Create a new Gold table using the CTAS approach. 
####Output Table Should Show Top Paying Customers Per Store and the Year of Their Payments

In [0]:
%sql
SELECT 
    s.store_id,
    s.manager_staff_id AS store_manager,
    c.customer_id,
    c.first_name AS customer_first_name,
    c.last_name AS customer_last_name,
    YEAR(f.payment_date) AS payment_year,
    SUM(f.payment_amount) AS total_payment_amount
FROM fact_orders_silver_tempview AS f
INNER JOIN sakila_dlh.dim_customer AS c
    ON f.customer_id = c.customer_id
INNER JOIN sakila_dlh.dim_store AS s
    ON c.store_id = s.store_id
GROUP BY 
    s.store_id, 
    s.manager_staff_id, 
    c.customer_id, 
    c.first_name, 
    c.last_name, 
    YEAR(f.payment_date) 
ORDER BY total_payment_amount DESC;




store_id,store_manager,customer_id,customer_first_name,customer_last_name,payment_year,total_payment_amount
2,2,526,KARL,SEAL,2005.0,221.5500000000001
1,1,148,ELEANOR,HUNT,2005.0,216.5400000000001
1,1,144,CLARA,SHAW,2005.0,195.58000000000007
2,2,137,RHONDA,KENNEDY,2005.0,194.61000000000007
2,2,178,MARION,SNYDER,2005.0,189.6200000000001
1,1,459,TOMMY,COLLAZO,2005.0,186.6200000000001
2,2,469,WESLEY,BULL,2005.0,177.60000000000002
1,1,468,TIM,CARY,2005.0,175.61000000000004
1,1,236,MARCIA,DEAN,2005.0,174.59
1,1,176,JUNE,CARROLL,2005.0,173.63000000000002
