In [0]:
%run ./3.0-Lab-setup

##DS-2002 Capstone Project F22
**Adam Snyder**

###Section 1: Prerequisites

#####1.1: Import Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#####1.2 Global Variables

In [0]:
# Azure SQL Server Connection Information
jdbc_hostname = "ds-2002-mysql-ars.mysql.database.azure.com"
jdbc_port = 3306
src_database = "bank_db"

connection_properties = {
  "user" : "adamsnyder",
  "password" : "P@ssw0rd123",
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# MongoDB Atlas Connection Information
atlas_cluster_name = "cluster0"
atlas_database_name = "bank_db"
atlas_user_name = "adamsnyder"
atlas_password = "AccessMongo"

# Data Files (JSON) Information
dst_database = "Bank_DW"

base_dir = "dbfs:/FileStore/snyder-capstone"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

output_bronze = f"{database_dir}/fact_transactions/bronze"
output_silver = f"{database_dir}/fact_transactions/silver"
output_gold   = f"{database_dir}/fact_transactions/gold"

# Delete the Streaming Files
dbutils.fs.rm(f"{database_dir}/fact_transactions", True)

# Delete the Database Files
dbutils.fs.rm(database_dir, True)

#####1.3 Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the Azure SQL database server.
# ######################################################################################################################
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure SQL Database'''
    jdbcUrl = f"jdbc:sqlserver://{host_name}:{port};database={db_name}"
    
    '''Invoke the spark.read.jdbc() function to query the database, and fill a Pandas DataFrame.'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)
    
    return dframe


# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.zibbf.mongodb.net/{db_name}?retryWrites=true&w=majority"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.zibbf.mongodb.net/{db_name}?retryWrites=true&w=majority"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

###Section 2: Azure SQL Database

#####2.1 Create a Database in Databricks and an Account Dimension Table That Sources its Data from a View in an Azure SQL Database

In [0]:
%sql
DROP DATABASE IF EXISTS Bank_DW CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS Bank_DW
COMMENT "Capstone Project Database"
LOCATION "dbfs:/FileStore/snyder-capstone/Bank_DW"
WITH DBPROPERTIES (contains_pii = true, purpose = "Capstone Project");

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_account
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds-2002-mysql-ars.mysql.database.azure.com:3306/bank_db",
  dbtable "ACCOUNT",
  user "adamsnyder",
  password "AccessAzure0"
);

In [0]:
%sql
USE DATABASE Bank_DW;

CREATE TABLE IF NOT EXISTS Bank_DW.dim_account
COMMENT "Account Dimension Table"
LOCATION "dbfs:/FileStore/snyder-capstone/Bank_DW/dim_account"
AS SELECT * FROM view_account;

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM Bank_DW.dim_account LIMIT 5

ACCOUNT_ID,AVAIL_BALANCE,CLOSE_DATE,LAST_ACTIVITY_DATE,OPEN_DATE,PENDING_BALANCE,STATUS,CUST_ID,OPEN_BRANCH_ID,OPEN_EMP_ID,PRODUCT_CD
1,1057.75,,2005-01-04,2000-01-15,1057.75,ACTIVE,1,2,10,CHK
2,500.0,,2004-12-19,2000-01-15,500.0,ACTIVE,1,2,10,SAV
3,3000.0,,2004-06-30,2004-06-30,3000.0,ACTIVE,1,2,10,CD
4,2258.02,,2004-12-27,2001-03-12,2258.02,ACTIVE,2,2,10,CHK
5,200.0,,2004-12-11,2001-03-12,200.0,ACTIVE,2,2,10,SAV


In [0]:
%sql
DESCRIBE EXTENDED Bank_DW.dim_account;

col_name,data_type,comment
ACCOUNT_ID,int,
AVAIL_BALANCE,double,
CLOSE_DATE,date,
LAST_ACTIVITY_DATE,date,
OPEN_DATE,date,
PENDING_BALANCE,double,
STATUS,string,
CUST_ID,int,
OPEN_BRANCH_ID,int,
OPEN_EMP_ID,int,


#####2.2 Create a Transaction Fact Table That Sources its Data from a View in an Azure SQL Database

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_txn
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds-2002-mysql-ars.mysql.database.azure.com:3306/bank_db",
  dbtable "ACC_TRANSACTION",
  user "adamsnyder",
  password "AccessAzure0"
);

In [0]:
%sql
USE DATABASE Bank_DW;

CREATE TABLE IF NOT EXISTS Bank_DW.fact_txn
COMMENT "Transaction Fact Table"
LOCATION "dbfs:/FileStore/snyder-capstone/Bank_DW/fact_txn"
AS SELECT * FROM view_txn;

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM Bank_DW.fact_txn LIMIT 5

TXN_ID,AMOUNT,FUNDS_AVAIL_DATE,TXN_DATE,TXN_TYPE_CD,ACCOUNT_ID,EXECUTION_BRANCH_ID,TELLER_EMP_ID
1,100.0,2000-01-15T00:00:00.000+0000,2000-01-15T00:00:00.000+0000,CDT,1,,
2,100.0,2000-01-15T00:00:00.000+0000,2000-01-15T00:00:00.000+0000,CDT,2,,
3,100.0,2004-06-30T00:00:00.000+0000,2004-06-30T00:00:00.000+0000,CDT,3,,
4,100.0,2001-03-12T00:00:00.000+0000,2001-03-12T00:00:00.000+0000,CDT,4,,
5,100.0,2001-03-12T00:00:00.000+0000,2001-03-12T00:00:00.000+0000,CDT,5,,


In [0]:
%sql
DESCRIBE EXTENDED Bank_DW.fact_txn;

col_name,data_type,comment
TXN_ID,bigint,
AMOUNT,double,
FUNDS_AVAIL_DATE,timestamp,
TXN_DATE,timestamp,
TXN_TYPE_CD,string,
ACCOUNT_ID,int,
EXECUTION_BRANCH_ID,int,
TELLER_EMP_ID,int,
,,
# Partitioning,,


###Section 3: MongoDB Atlas

#####3.1 Fetch Data From MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_employee = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "bank_db").option("collection", "EMPLOYEE").load()
display(df_employee)

ASSIGNED_BRANCH_ID,DEPT_ID,EMP_ID,END_DATE,FIRST_NAME,LAST_NAME,START_DATE,SUPERIOR_EMP_ID,TITLE,_id
1,3,1,,Michael,Smith,2001-06-22,,President,List(6387f79d8c887be294b56aca)
1,3,2,,Susan,Barker,2002-09-12,1.0,Vice President,List(6387f79d8c887be294b56acb)
1,3,3,,Robert,Tyler,2000-02-09,1.0,Treasurer,List(6387f79d8c887be294b56acc)
1,1,4,,Susan,Hawthorne,2002-04-24,3.0,Operations Manager,List(6387f79d8c887be294b56acd)
1,2,5,,John,Gooding,2003-11-14,4.0,Loan Manager,List(6387f79d8c887be294b56ace)
1,1,6,,Helen,Fleming,2004-03-17,4.0,Head Teller,List(6387f79d8c887be294b56acf)
1,1,7,,Chris,Tucker,2004-09-15,6.0,Teller,List(6387f79d8c887be294b56ad0)
1,1,8,,Sarah,Parker,2002-12-02,6.0,Teller,List(6387f79d8c887be294b56ad1)
1,1,9,,Jane,Grossman,2002-05-03,6.0,Teller,List(6387f79d8c887be294b56ad2)
2,1,10,,Paula,Roberts,2002-07-27,4.0,Head Teller,List(6387f79d8c887be294b56ad3)


In [0]:
%scala
df_employee.printSchema()

#####3.2 Write the New Dataframe into the Databricks Bank_DW Database

In [0]:
%scala
df_employee.write.format("delta").mode("overwrite").saveAsTable("Bank_DW.dim_employee")

In [0]:
%sql
SELECT * FROM Bank_DW.dim_employee LIMIT 5

In [0]:
%sql
DESCRIBE Bank_DW.dim_employee;

col_name,data_type,comment
ASSIGNED_BRANCH_ID,int,
DEPT_ID,int,
EMP_ID,int,
END_DATE,void,
FIRST_NAME,string,
LAST_NAME,string,
START_DATE,string,
SUPERIOR_EMP_ID,int,
TITLE,string,
_id,struct,


#####3.3 Query the New Employee Dimension in Databricks Database

In [0]:
%sql
SELECT * FROM Bank_DW.dim_employee LIMIT 5

###Section 4: File System

#####4.1 Use PySpark to Read From JSON File

In [0]:
branch_csv = f"{base_dir}/bank_branch.csv"

df_branch = spark.read.format('csv').options(header='true', inferSchema='true').load(branch_csv)
display(df_branch)

BRANCH_ID,ADDRESS,CITY,NAME,STATE,ZIP_CODE
1,3882 Main St.,Waltham,Headquarters,MA,2451
2,422 Maple St.,Woburn,Woburn Branch,MA,1801
3,125 Presidential Way,Quincy,Quincy Branch,MA,2169
4,378 Maynard Ln.,Salem,So. NH Branch,NH,3079


In [0]:
df_branch.printSchema()

In [0]:
df_branch.write.format("delta").mode("overwrite").saveAsTable("Bank_DW.dim_branch")

In [0]:
%sql
DESCRIBE EXTENDED Bank_DW.dim_branch;

col_name,data_type,comment
BRANCH_ID,int,
ADDRESS,string,
CITY,string,
NAME,string,
STATE,string,
ZIP_CODE,int,
,,
# Partitioning,,
Not partitioned,,
,,


In [0]:
%sql
SELECT * FROM Bank_DW.dim_branch LIMIT 5;

BRANCH_ID,ADDRESS,CITY,NAME,STATE,ZIP_CODE
1,3882 Main St.,Waltham,Headquarters,MA,2451
2,422 Maple St.,Woburn,Woburn Branch,MA,1801
3,125 Presidential Way,Quincy,Quincy Branch,MA,2169
4,378 Maynard Ln.,Salem,So. NH Branch,NH,3079


###Section 5: Verify Dimension Tables

In [0]:
%sql
USE Bank_DW;
SHOW TABLES

database,tableName,isTemporary
bank_dw,dim_account,False
bank_dw,dim_branch,False
bank_dw,dim_employee,False
bank_dw,fact_txn,False
,view_account,True
,view_txn,True


###Section 6: Inegrate Reference Data with Real-Time Data

#####6.1 Bronze Table: Process Raw JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "TxnID INT")
 .option("cloudFiles.schemaHints", "Amount INT")
 .option("cloudFiles.schemaHints", "TxnDate TIMESTAMP")
 .option("cloudFiles.schemaHints", "TxnType STRING")
 .option("cloudFiles.schemaHints", "AccKey INT")
 .option("cloudFiles.schemaHints", "BranchKey INT")
 .option("cloudFiles.schemaHints", "EmpKey INT")
 .option("cloudFiles.schemaLocation", output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("txns_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM txns_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

ADDRESS,BRANCH_ID,CITY,NAME,STATE,ZIP_CODE,id,EmpKey,_rescued_data,receipt_time,source_file
3882 Main St.,1.0,Waltham,Headquarters,MA,2451.0,1.0,,,2022-12-11T22:45:53.194+0000,dbfs:/FileStore/snyder-capstone/source_data/stream/bank_branch.json
422 Maple St.,2.0,Woburn,Woburn Branch,MA,1801.0,2.0,,,2022-12-11T22:45:53.194+0000,dbfs:/FileStore/snyder-capstone/source_data/stream/bank_branch.json
125 Presidential Way,3.0,Quincy,Quincy Branch,MA,2169.0,3.0,,,2022-12-11T22:45:53.194+0000,dbfs:/FileStore/snyder-capstone/source_data/stream/bank_branch.json
378 Maynard Ln.,4.0,Salem,So. NH Branch,NH,3079.0,4.0,,,2022-12-11T22:45:53.194+0000,dbfs:/FileStore/snyder-capstone/source_data/stream/bank_branch.json
,,,,,,5.0,,,2022-12-11T22:45:53.194+0000,dbfs:/FileStore/snyder-capstone/source_data/stream/bank_branch.json
,,,,,,,,,2022-12-11T22:45:53.194+0000,dbfs:/FileStore/snyder-capstone/source_data/stream/bank_branch.csv


In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

ADDRESS,BRANCH_ID,CITY,NAME,STATE,ZIP_CODE,id,EmpKey,_rescued_data,receipt_time,source_file
3882 Main St.,1.0,Waltham,Headquarters,MA,2451.0,1.0,,,2022-12-11T22:47:32.402+0000,dbfs:/FileStore/snyder-capstone/source_data/stream/bank_branch.json
422 Maple St.,2.0,Woburn,Woburn Branch,MA,1801.0,2.0,,,2022-12-11T22:47:32.402+0000,dbfs:/FileStore/snyder-capstone/source_data/stream/bank_branch.json
125 Presidential Way,3.0,Quincy,Quincy Branch,MA,2169.0,3.0,,,2022-12-11T22:47:32.402+0000,dbfs:/FileStore/snyder-capstone/source_data/stream/bank_branch.json
378 Maynard Ln.,4.0,Salem,So. NH Branch,NH,3079.0,4.0,,,2022-12-11T22:47:32.402+0000,dbfs:/FileStore/snyder-capstone/source_data/stream/bank_branch.json
,,,,,,5.0,,,2022-12-11T22:47:32.402+0000,dbfs:/FileStore/snyder-capstone/source_data/stream/bank_branch.json
,,,,,,,,,2022-12-11T22:47:32.402+0000,dbfs:/FileStore/snyder-capstone/source_data/stream/bank_branch.csv


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
ADDRESS,string,
BRANCH_ID,string,
CITY,string,
NAME,string,
STATE,string,
ZIP_CODE,string,
id,bigint,
EmpKey,int,
_rescued_data,string,
receipt_time,timestamp,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT t.id as Txn_ID
    , t.BRANCH_ID
    , br.NAME
    , br.ADDRESS
    , br.CITY
    , br.STATE
    , br.ZIP_CODE
  FROM orders_silver_tempview t
  INNER JOIN Bank_DW.dim_branch br
  ON t.BRANCH_ID = br.BRANCH_ID)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

In [0]:
%sql
SELECT * FROM fact_orders_silver

Txn_ID,BRANCH_ID,NAME,ADDRESS,CITY,STATE,ZIP_CODE
1,1,Headquarters,3882 Main St.,Waltham,MA,2451
2,2,Woburn Branch,422 Maple St.,Woburn,MA,1801
3,3,Quincy Branch,125 Presidential Way,Quincy,MA,2169
4,4,So. NH Branch,378 Maynard Ln.,Salem,NH,3079


In [0]:
%sql
DESCRIBE EXTENDED Bank_DW.fact_orders_silver

col_name,data_type,comment
Txn_ID,bigint,
BRANCH_ID,string,
NAME,string,
ADDRESS,string,
CITY,string,
STATE,string,
ZIP_CODE,int,
,,
# Partitioning,,
Not partitioned,,


##### 6.4. Gold Table: Perform Aggregations

In [0]:
%sql
SELECT STATE as LocationOfSale
  , COUNT(Txn_ID) AS TxnCount
FROM Bank_DW.fact_orders_silver
GROUP BY STATE
ORDER BY TxnCount DESC

LocationOfSale,TxnCount
MA,3
NH,1
