# Ben Harris - DS2002 Capstone

### Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

### Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "nra2je-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "capstone"

connection_properties = {
  "user" : "nra2je",
  "password" : "Wilson23185!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "sandbox.4mvrkqg"
atlas_database_name = "capstone"
atlas_user_name = "nra2je"
atlas_password = "Passw0rd123"

# Data Files (CSV) Information ###############################
dst_database = "sales_data"

base_dir = "dbfs:/FileStore/capstone_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/sales"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

inspectors_stream_dir = f"{stream_dir}/inspector_data"
stores_stream_dir = f"{stream_dir}/stores_data"
sales_stream_dir = f"{stream_dir}/sales"

sales_output_bronze = f"{database_dir}/fact_sales/bronze"
sales_output_silver = f"{database_dir}/fact_sales/silver"
sales_output_gold   = f"{database_dir}/fact_sales/gold"


# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_sales", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

False

### Global Functions

In [0]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

def set_mongo_collection(client, db_name, data_directory, csv_files):
    db = client[db_name]
    
    for collection_name, csv_file in csv_files.items():
        db[collection_name].drop()
        csv_path = os.path.join(data_directory, csv_file)
        df = pd.read_csv(csv_path)
        records = df.to_dict(orient='records')
        db[collection_name].insert_many(records)
    
    client.close()


### Ingest Reference Data

In [0]:
%sql
DROP DATABASE IF EXISTS capstone2 CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS capstone2
LOCATION "dbfs:/FileStore/capstone_data/capstone2"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone");

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://nra2je-mysql.mysql.database.azure.com:3306/capstone",
  dbtable "dim_date",
  user "nra2je",
  password "Wilson23185!" 
)

In [0]:
%sql
USE DATABASE capstone2;

CREATE OR REPLACE TABLE capstone2.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/lab_data/capstone2/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED capstone2.dim_date;

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,varchar(11),
date_name_us,varchar(11),
date_name_eu,varchar(11),
day_of_week,int,
day_name_of_week,varchar(10),
day_of_month,int,
day_of_year,int,
weekday_weekend,varchar(10),


In [0]:
%sql
SELECT * FROM capstone2.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


### Fetch Reference Data

In [0]:
display(dbutils.fs.ls(batch_dir))

[0;31m---------------------------------------------------------------------------[0m
[0;31mExecutionError[0m                            Traceback (most recent call last)
File [0;32m<command-2756068366050092>, line 1[0m
[0;32m----> 1[0m display([43mdbutils[49m[38;5;241;43m.[39;49m[43mfs[49m[38;5;241;43m.[39;49m[43mls[49m[43m([49m[43mbatch_dir[49m[43m)[49m)

File [0;32m/databricks/python_shell/dbruntime/dbutils.py:362[0m, in [0;36mDBUtils.FSHandler.prettify_exception_message.<locals>.f_with_exception_handling[0;34m(*args, **kwargs)[0m
[1;32m    360[0m exc[38;5;241m.[39m__context__ [38;5;241m=[39m [38;5;28;01mNone[39;00m
[1;32m    361[0m exc[38;5;241m.[39m__cause__ [38;5;241m=[39m [38;5;28;01mNone[39;00m
[0;32m--> 362[0m [38;5;28;01mraise[39;00m exc

[0;31mExecutionError[0m: An error occurred while calling o424.ls.
: java.io.FileNotFoundException: No such file or directory dbfs:/FileStore/capstone_data/retail/batch
	at com.databricks.back

In [0]:
source_dir = '/dbfs/FileStore/capstone_data/batch'
csv_files = {"inspectors" : 'inspector_data.csv'
              , "stores" : 'stores_data.csv'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, csv_files) 

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-2756068366050077>, line 5[0m
[1;32m      1[0m source_dir [38;5;241m=[39m [38;5;124m'[39m[38;5;124m/dbfs/FileStore/capstone_data/stream[39m[38;5;124m'[39m
[1;32m      2[0m csv_files [38;5;241m=[39m {[38;5;124m"[39m[38;5;124minspectors[39m[38;5;124m"[39m : [38;5;124m'[39m[38;5;124minspector_data.csv[39m[38;5;124m'[39m
[1;32m      3[0m               , [38;5;124m"[39m[38;5;124mstores[39m[38;5;124m"[39m : [38;5;124m'[39m[38;5;124mstores_data.csv[39m[38;5;124m'[39m}
[0;32m----> 5[0m set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, csv_files) 

[0;31mTypeError[0m: set_mongo_collection() takes 4 positional arguments but 6 were given

In [0]:
%scala
import com.mongodb.spark._

val userName = "nra2je"
val pwd = "Passw0rd123"
val clusterName = "sandbox.4mvrkqg"
val atlas_uri = s"mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority"

#### Inspector Dimension

In [0]:
%scala

val df_inspector = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "capstone")
.option("collection", "inspectors").load()
.select("Inspector_ID","First","Last","Email")

display(df_inspector)

In [0]:
%scala
df_inspector.printSchema()

In [0]:
%scala
df_inspector.write.format("delta").mode("overwrite").saveAsTable("capstone2.dim_inspector")

In [0]:
%sql
DESCRIBE EXTENDED capstone2.dim_inspector

In [0]:
%sql
SELECT * FROM capstone2.dim_inspector LIMIT 5

#### Store Dimension

In [0]:
store_csv = f"{batch_dir}/stores_data.csv"

df_store = spark.read.format('csv').options(header='true', inferSchema='true').load(store_csv)
display(df_store)

In [0]:
df_store.printSchema()

In [0]:
%scala
df_store.write.format("delta").mode("overwrite").saveAsTable("capstone2.dim_store")

In [0]:
%sql
DESCRIBE EXTENDED capstone2.dim_store

In [0]:
%sql
SELECT * FROM capstone2.dim_store LIMIT 5

#### Verify Dimension Tables

In [0]:
%sql
USE capstone2;
SHOW TABLES

### Integrate Reference and Real Time Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", sales_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(sales_stream_dir)
 .createOrReplaceTempView("sales_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW sales_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM sales_raw_tempview
)

In [0]:
%sql
SELECT * FROM sales_bronze_tempview

In [0]:
(spark.table("sales_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{sales_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_sales_bronze"))

In [0]:
(spark.readStream
  .table("fact_sales_bronze")
  .createOrReplaceTempView("sales_silver_tempview"))

In [0]:
%sql
SELECT * FROM sales_silver_tempview

In [0]:
%sql
DESCRIBE EXTENDED sales_silver_tempview

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_sales_silver_tempview AS (
  SELECT st.Type,
      st.Size,
      st.State,
      st.City,
      st.Address,
      s.Dept,
      i.First,
      i.Last,
      i.Email,
      d.day_name_of_week AS inspected_day_name_of_week,
      d.day_of_month AS inspected_day_of_month,
      d.weekday_weekend AS inspected_weekday_weekend,
      d.month_name AS inspected_month_name,
      d.calendar_quarter AS inspected_calendar_quarter,
      d.calendar_year AS inspected_calendar_year,
      s.Weekly_Sales,
     s.IsHoliday
  FROM sales_silver_tempview AS s
  INNER JOIN capstone2.dim_inspector AS i
  ON s.Inspector_id = i.Inspector_id
  INNER JOIN capstone2.dim_store as st
  ON s.Store_id = st.Store
  INNER JOIN capstone2.dim_date as d
  ON s.sale_date_key = d.date_key
)

In [0]:
(spark.table("fact_sales_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{sales_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_sales_silver"))

In [0]:
%sql
SELECT * FROM fact_sales_silver

In [0]:
%sql
DESCRIBE EXTENDED capstone2.fact_sales_silver

#### Gold Table Aggregations

In [0]:
%sql
CREATE OR REPLACE TABLE capstone2.fact_sales_by_store AS (
  SELECT store_id AS Store_Number
    , Address AS Store_Location
    , CONCAT('$', FORMAT(AVG(Weekly_Sales), 0)) AS Average_Weekly_Sales,
    , CONCAT('$', FORMAT(SUM(sWeekly_Sales), 0)) AS Total_Sales
  FROM capstone2.fact_sales_silver
  GROUP BY store_id, Address
  LIMIT 5);

SELECT * FROM capstone2.fact_sale_by_store

In [0]:
%fs rm -r /FileStore/lab_data/