In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

In [0]:
# Azure MySQL Server Connection Information ################### change <server_name> to my server name 
jdbc_hostname = "ds2002-whj6jg.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila"

#change user_name, password, DRIVER IS CORRECT 
connection_properties = {
  "user" : "meganvanderwiele",
  "password" : "Tigers11!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ######################## this is the same as that other lab, change user name and password
atlas_cluster_name = "cluster0.m0hmuwa"
atlas_database_name = "sakila"
atlas_user_name = "meganvanderwiele"
atlas_password = "Tigers11"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"


#catalog, browse dbfs, file store, lab_data, drag it in  - - might have already done this 
base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/final"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe
    ##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Lab 06 Database"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Lab 6.0");

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds2002-whj6jg.mysql.database.azure.com:3306/sakila", --Replace with your Server Name
  dbtable "dim_date",
  user "meganvanderwiele",    --Replace with your User Name
  password "Tigers11!"  --Replace with you password
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/sakila_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,int,
day_name_of_week,varchar(10),
day_of_month,int,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


In [0]:
display(dbutils.fs.ls(batch_dir))  #'/dbfs/FileStore/lab_data/final/batch'

path,name,size,modificationTime
dbfs:/FileStore/lab_data/final/batch/DImCustomersSakila.json,DImCustomersSakila.json,123112,1701919610000
dbfs:/FileStore/lab_data/final/batch/DImDateSakilaStream.json,DImDateSakilaStream.json,70003,1701919610000
dbfs:/FileStore/lab_data/final/batch/DImProductsSakila.json,DImProductsSakila.json,201090,1701919610000
dbfs:/FileStore/lab_data/final/batch/DimStoresSakila.csv,DimStoresSakila.csv,119,1701927490000
dbfs:/FileStore/lab_data/final/batch/Fact_Orders_Sakila_01.json,Fact_Orders_Sakila_01.json,24447,1701918607000
dbfs:/FileStore/lab_data/final/batch/Fact_Orders_Sakila_02.json,Fact_Orders_Sakila_02.json,24569,1701918607000
dbfs:/FileStore/lab_data/final/batch/Fact_Orders_Sakila_03.csv,Fact_Orders_Sakila_03.csv,8655,1701918607000
dbfs:/FileStore/lab_data/final/batch/Fact_Orders_Sakila_04.csv,Fact_Orders_Sakila_04.csv,8660,1701918607000


In [0]:
source_dir = '/dbfs/FileStore/lab_data/final/batch'
json_files = {"customers" : 'DImCustomersSakila.json',
              "products" : 'DImProductsSakila.json'
              }

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f63cd560f80>

In [0]:
%scala
import com.mongodb.spark._

val userName = "meganvanderwiele"
val pwd = "Tigers11"
val clusterName = "cluster0.m0hmuwa"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

In [0]:
%scala


val df_products = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila")
.option("collection", "products").load()
.select("film_key",
"title",
"release_year",
"rental_duration",
"rental_rate",
"replacement_cost",
"last_update")


display(df_products)


film_key,title,release_year,rental_duration,rental_rate,replacement_cost,last_update
1,ACADEMY DINOSAUR,2006,6,0.99,20.99,2006-02-15 05:03:42
2,ACE GOLDFINGER,2006,3,4.99,12.99,2006-02-15 05:03:42
3,ADAPTATION HOLES,2006,7,2.99,18.99,2006-02-15 05:03:42
4,AFFAIR PREJUDICE,2006,5,2.99,26.99,2006-02-15 05:03:42
5,AFRICAN EGG,2006,6,2.99,22.99,2006-02-15 05:03:42
6,AGENT TRUMAN,2006,3,2.99,17.99,2006-02-15 05:03:42
7,AIRPLANE SIERRA,2006,6,4.99,28.99,2006-02-15 05:03:42
8,AIRPORT POLLOCK,2006,6,4.99,15.99,2006-02-15 05:03:42
9,ALABAMA DEVIL,2006,3,2.99,21.99,2006-02-15 05:03:42
10,ALADDIN CALENDAR,2006,6,4.99,24.99,2006-02-15 05:03:42


In [0]:
%scala
df_products.printSchema()


In [0]:
%scala
df_products.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_products")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_products

col_name,data_type,comment
film_key,int,
title,string,
release_year,int,
rental_duration,int,
rental_rate,double,
replacement_cost,double,
last_update,string,
,,
# Delta Statistics Columns,,
Column Names,"rental_rate, rental_duration, film_key, release_year, replacement_cost, last_update, title",


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_products LIMIT 5

film_key,title,release_year,rental_duration,rental_rate,replacement_cost,last_update
1,ACADEMY DINOSAUR,2006,6,0.99,20.99,2006-02-15 05:03:42
2,ACE GOLDFINGER,2006,3,4.99,12.99,2006-02-15 05:03:42
3,ADAPTATION HOLES,2006,7,2.99,18.99,2006-02-15 05:03:42
4,AFFAIR PREJUDICE,2006,5,2.99,26.99,2006-02-15 05:03:42
5,AFRICAN EGG,2006,6,2.99,22.99,2006-02-15 05:03:42


In [0]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "sakila")
.option("collection", "customers").load()
.select("customer_key",
"store_key",
"first_name",
"last_name",
"address_id",
"create_date",
"last_update")

display(df_customer)

customer_key,store_key,first_name,last_name,address_id,create_date,last_update
1,1,MARY,SMITH,5,2006-02-14 22:04:36,2006-02-15 04:57:20
2,1,PATRICIA,JOHNSON,6,2006-02-14 22:04:36,2006-02-15 04:57:20
3,1,LINDA,WILLIAMS,7,2006-02-14 22:04:36,2006-02-15 04:57:20
4,2,BARBARA,JONES,8,2006-02-14 22:04:36,2006-02-15 04:57:20
5,1,ELIZABETH,BROWN,9,2006-02-14 22:04:36,2006-02-15 04:57:20
6,2,JENNIFER,DAVIS,10,2006-02-14 22:04:36,2006-02-15 04:57:20
7,1,MARIA,MILLER,11,2006-02-14 22:04:36,2006-02-15 04:57:20
8,2,SUSAN,WILSON,12,2006-02-14 22:04:36,2006-02-15 04:57:20
9,2,MARGARET,MOORE,13,2006-02-14 22:04:36,2006-02-15 04:57:20
10,1,DOROTHY,TAYLOR,14,2006-02-14 22:04:36,2006-02-15 04:57:20


In [0]:
%scala
df_customer.printSchema()

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customer

col_name,data_type,comment
customer_key,int,
store_key,int,
first_name,string,
last_name,string,
address_id,int,
create_date,string,
last_update,string,
,,
# Delta Statistics Columns,,
Column Names,"first_name, address_id, last_update, create_date, last_name, store_key, customer_key",


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customer LIMIT 5

customer_key,store_key,first_name,last_name,address_id,create_date,last_update
1,1,MARY,SMITH,5,2006-02-14 22:04:36,2006-02-15 04:57:20
2,1,PATRICIA,JOHNSON,6,2006-02-14 22:04:36,2006-02-15 04:57:20
3,1,LINDA,WILLIAMS,7,2006-02-14 22:04:36,2006-02-15 04:57:20
4,2,BARBARA,JONES,8,2006-02-14 22:04:36,2006-02-15 04:57:20
5,1,ELIZABETH,BROWN,9,2006-02-14 22:04:36,2006-02-15 04:57:20


In [0]:
stores_csv = f"{batch_dir}/DimStoresSakila.csv"

df_stores= spark.read.format('csv').options(header='true', inferSchema='true').load(stores_csv)
display(df_stores)

my_row_id,store_id,manager_staff_id,address_id,last_update
1,1,1,1,2006-02-15T04:57:12Z
2,2,2,2,2006-02-15T04:57:12Z


In [0]:
df_stores.printSchema()

root
 |-- my_row_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- manager_staff_id: integer (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [0]:
df_stores.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_stores")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_stores;

col_name,data_type,comment
my_row_id,int,
store_id,int,
manager_staff_id,int,
address_id,int,
last_update,timestamp,
,,
# Delta Statistics Columns,,
Column Names,"my_row_id, store_id, address_id, manager_staff_id, last_update",
Column Selection Method,first-32,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_stores LIMIT 5;

my_row_id,store_id,manager_staff_id,address_id,last_update
1,1,1,1,2006-02-15T04:57:12Z
2,2,2,2,2006-02-15T04:57:12Z


In [0]:
%sql
USE sakila_dlh;
SHOW TABLES

database,tableName,isTemporary
sakila_dlh,dim_customer,False
sakila_dlh,dim_date,False
sakila_dlh,dim_products,False
sakila_dlh,dim_stores,False
,display_query_1,True
,display_query_2,True
,display_query_3,True
,display_query_4,True
,orders_bronze_tempview,True
,orders_raw_tempview,True


In [0]:
##Section III: Integrate Reference Data with Real-Time Data
##6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Rentals Data
##6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql

/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

customer_key,fact_order_key,inventory_key,last_update,rental_date,rental_key,return_date,staff_id,_rescued_data,receipt_time,source_file


In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f63cd4237c0>

In [0]:
#silver table
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
customer_key,bigint,
fact_order_key,bigint,
inventory_key,bigint,
last_update,string,
rental_date,string,
rental_key,bigint,
return_date,string,
staff_id,bigint,
_rescued_data,string,
receipt_time,timestamp,


In [0]:

%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.fact_order_key,
  o.inventory_key,
  o.customer_key,
  o.staff_id,
  o.rental_date,
  o.rental_key,
  o.return_date,
  p.rental_rate,
  p.film_key,
  p.rental_duration,
  c.first_name,
  c.last_name,
  s.store_id,
  rd.day_name_of_week AS rental_day_name_of_week,
  rd.day_of_month AS rental_day_of_month,
  rd.weekday_weekend AS rental_weekday_weekend,
  rd.month_name AS rental_month_name,
  rd.calendar_quarter AS rental_quarter,
  rd.calendar_year AS rental_year


  FROM orders_silver_tempview AS o
  INNER JOIN sakila_dlh.dim_customer AS c
  ON c.customer_key = o.customer_key
  INNER JOIN sakila_dlh.dim_stores AS s
  ON s.store_id = o.staff_id
  INNER JOIN sakila_dlh.dim_products as p
  ON p.film_key = o.inventory_key
  LEFT OUTER JOIN sakila_dlh.dim_date AS rd
  ON rd.full_date = o.return_date
)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f63cd591c30>

In [0]:
%sql
SELECT * FROM fact_orders_silver

fact_order_key,inventory_key,customer_key,staff_id,rental_date,rental_key,return_date,rental_rate,film_key,rental_duration,first_name,last_name,store_id,rental_day_name_of_week,rental_day_of_month,rental_weekday_weekend,rental_month_name,rental_quarter,rental_year
117,213,510,1,2005-05-31 16:23:02,1118,2005-06-03 20:00:02,0.99,213,4,BEN,EASTER,1,Friday,3,Weekday,June,2,2005
122,142,448,2,2005-05-31 16:48:43,1123,2005-06-02 19:17:43,0.99,142,3,MIGUEL,BETANCOURT,2,Thursday,2,Weekday,June,2,2005
128,386,486,1,2005-05-31 18:00:48,1129,2005-06-04 23:05:48,4.99,386,3,GLEN,TALBERT,1,Saturday,4,Weekend,June,2,2005
133,143,191,2,2005-05-31 19:14:15,1134,2005-06-02 17:13:15,0.99,143,6,JEANETTE,GREENE,2,Thursday,2,Weekday,June,2,2005
138,666,562,1,2005-05-31 19:34:52,1139,2005-06-06 17:40:52,4.99,666,4,WALLACE,SLONE,1,Monday,6,Weekday,June,2,2005
146,622,259,2,2005-05-31 20:37:52,1147,2005-06-06 19:23:52,0.99,622,4,LENA,JENSEN,2,Monday,6,Weekday,June,2,2005
148,352,326,2,2005-05-31 21:03:17,1149,2005-06-08 19:58:17,0.99,352,4,JOSE,ANDREW,2,Wednesday,8,Weekday,June,2,2005
151,210,191,2,2005-05-31 21:32:17,1152,2005-06-04 21:07:17,4.99,210,3,JEANETTE,GREENE,2,Saturday,4,Weekend,June,2,2005
155,460,106,2,2005-05-31 22:37:34,1156,2005-06-01 23:02:34,4.99,460,3,CONNIE,WALLACE,2,Wednesday,1,Weekday,June,2,2005
161,987,310,1,2005-06-14 23:09:38,1162,2005-06-23 22:00:38,2.99,987,3,DANIEL,CABRAL,1,Thursday,23,Weekday,June,2,2005


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_orders_silver

col_name,data_type,comment
fact_order_key,bigint,
inventory_key,bigint,
customer_key,bigint,
staff_id,bigint,
rental_date,string,
rental_key,bigint,
return_date,string,
rental_rate,double,
film_key,int,
rental_duration,int,


In [0]:
%sql
/*Creating Gold Table With Film Rental Duration per Customer*/
CREATE OR REPLACE TABLE sakila_dlh.fact_rental_duration_by_customer AS (
  SELECT customer_key AS CustomerID
    , first_name AS FirstName
    , last_name AS LastName
    , rental_duration AS Duration
    , COUNT(rental_duration) AS DurationCount
  FROM sakila_dlh.fact_orders_silver
  GROUP BY CustomerID, LastName, FirstName, Duration
  ORDER BY Duration DESC);

SELECT * FROM sakila_dlh.fact_rental_duration_by_customer;

CustomerID,FirstName,LastName,Duration,DurationCount
396,EARL,SHANKS,7,1
303,WILLIAM,SATTERFIELD,7,1
456,RONNIE,RICKETTS,7,1
507,EDGAR,RHOADS,7,1
586,KIRK,STCLAIR,7,1
236,MARCIA,DEAN,7,1
164,JOANN,GARDNER,7,1
571,JOHNNIE,CHISHOLM,7,1
560,JORDAN,ARCHULETA,7,1
443,FRANCISCO,SKIDMORE,7,1


Databricks visualization. Run in Databricks to view.