## Project 2: Capstone
### Author: Kyra Lim, smy2qe
The goal of the second data project, building upon the first project, is to further demonstrate (1) an understanding of and (2) competence creating and implementing basic data science systems such as pipelines, scripts, data transformations, APIs, databases and cloud services.
### Section I: Prerequisites
#### Import the Necessary Libraries

In [1]:
import findspark
findspark.init()
print(findspark.find())

import os
import sys
import json
import time
import pymongo
import certifi
import shutil
import pandas as pd

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W

C:\spark-3.5.4-bin-hadoop3


#### Instantiate Global Variables

In [2]:
# --------------------------------------------------------------------------------
# Specify MySQL Server Connection Information
# --------------------------------------------------------------------------------
mysql_args = {
    "host_name" : "localhost",
    "port" : "3306",
    "db_name" : "adventureworks",
    "conn_props" : {
        "user" : "klim",
        "password" : "pAssword123",
        "driver" : "com.mysql.cj.jdbc.Driver"
    }
}

# --------------------------------------------------------------------------------
# Specify MongoDB Cluster Connection Information
# --------------------------------------------------------------------------------
mongodb_args = {
    "cluster_location" : "atlas", # "local"
    "user_name" : "klim",
    "password" : "pAssword123",
    "cluster_name" : "sandbox",
    "cluster_subnet" : "tn5yk",
    "db_name" : "adventureworks",
    "collection" : "",
    "null_column_threshold" : 0.5
}

# --------------------------------------------------------------------------------
# Specify Directory Structure for Source Data
# --------------------------------------------------------------------------------
base_dir = os.path.join(os.getcwd(), 'project_data')
data_dir = os.path.join(base_dir, 'adventureworks')
batch_dir = os.path.join(data_dir, 'batch')
stream_dir = os.path.join(data_dir, 'streaming')

purchase_orders_stream_dir = os.path.join(stream_dir, 'purchase_orders')

# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "adventureworks_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

purchase_orders_output_bronze = os.path.join(database_dir, 'fact_purchase_orders', 'bronze')
purchase_orders_output_silver = os.path.join(database_dir, 'fact_purchase_orders', 'silver')
purchase_orders_output_gold = os.path.join(database_dir, 'fact_purchase_orders', 'gold')

#### Define Global Functions

In [3]:
def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    '''Fetch each item in the directory, and filter out any directories.'''
    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    '''Populate lists with the Size and Last Modification DateTime for each file in the directory.'''
    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)


def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
        
    print(f"The stream has processed {len(query.recentProgress)} batchs")


def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"
        

def drop_null_columns(df, threshold):
    '''Drop Columns having a percentage of NULL values that exceeds the given 'threshold' parameter value.'''
    columns_with_nulls = [col for col in df.columns if df.filter(df[col].isNull()).count() / df.count() > threshold] 
    df_dropped = df.drop(*columns_with_nulls) 
    
    return df_dropped
    
    
def get_mysql_dataframe(spark_session, sql_query : str, **args):
    '''Create a JDBC URL to the MySQL Database'''
    jdbc_url = f"jdbc:mysql://{args['host_name']}:{args['port']}/{args['db_name']}"
    
    '''Invoke the spark.read.format("jdbc") function to query the database, and fill a DataFrame.'''
    dframe = spark_session.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("driver", args['conn_props']['driver']) \
    .option("user", args['conn_props']['user']) \
    .option("password", args['conn_props']['password']) \
    .option("query", sql_query) \
    .load()
    
    return dframe
    

def get_mongo_uri(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the 'cluster_location' parameter.")
        
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri


def get_spark_conf_args(spark_jars : list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "
    
    sparkConf_args = {
        "app_name" : "PySpark Adventureworks Data Lakehouse (Medallion Architecture)",
        "worker_threads" : f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions" : int(os.cpu_count()),
        "mongo_uri" : get_mongo_uri(**args),
        "spark_jars" : jars[0:-2],
        "database_dir" : sql_warehouse_dir
    }
    
    return sparkConf_args
    

def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name'])\
    .setMaster(args['worker_threads']) \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.memory', '2g') \
    .set('spark.jars', args['spark_jars']) \
    .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .set('spark.mongodb.input.uri', args['mongo_uri']) \
    .set('spark.mongodb.output.uri', args['mongo_uri']) \
    .set('spark.sql.adaptive.enabled', 'false') \
    .set('spark.sql.debug.maxToStringFields', 35) \
    .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
    .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .set('spark.sql.streaming.schemaInference', 'true') \
    .set('spark.sql.warehouse.dir', args['database_dir']) \
    .set('spark.streaming.stopGracefullyOnShutdown', 'true')
    
    return sparkConf


def get_mongo_client(**args):
    '''Get MongoDB Client Connection'''
    mongo_uri = get_mongo_uri(**args)
    if args['cluster_location'] == "atlas":
        client = pymongo.MongoClient(mongo_uri, tlsCAFile=certifi.where())

    elif args['cluster_location'] == "local":
        client = pymongo.MongoClient(mongo_uri)
        
    else:
        raise Exception("A MongoDB Client could not be created.")

    return client
    
    
def set_mongo_collections(mongo_client, db_name : str, data_directory : str, json_files : list):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()
    

def get_mongodb_dataframe(spark_session, **args):
    '''Query MongoDB, and create a DataFrame'''
    dframe = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("database", args['db_name']) \
        .option("collection", args['collection']).load()

    '''Drop the '_id' index column to clean up the response.'''
    dframe = dframe.drop('_id')
    
    '''Call the drop_null_columns() function passing in the dataframe.'''
    dframe = drop_null_columns(dframe, args['null_column_threshold'])
    
    return dframe

#### Initialize Data Lakehouse Directory Structure

In [4]:
remove_directory_tree(database_dir)

"Directory 'C:\\Users\\kyral\\Data Systems\\DS-2002-main\\Project 2 - Capstone\\spark-warehouse\\adventureworks_dlh.db' has been removed successfully."

#### Create a New Spark Session

In [5]:
worker_threads = f"local[{int(os.cpu_count()/2)}]"

jars = []
mysql_spark_jar = os.path.join(os.getcwd(), "mysql-connector-j-9.1.0", "mysql-connector-j-9.1.0.jar")
mssql_spark_jar = os.path.join(os.getcwd(), "sqljdbc_12.8", "enu", "jars", "mssql-jdbc-12.8.1.jre11.jar")

jars.append(mysql_spark_jar)
#jars.append(mssql_spark_jar)

sparkConf_args = get_spark_conf_args(jars, **mongodb_args)

sparkConf = get_spark_conf(**sparkConf_args)
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("OFF")
spark

#### Create a New Metadata Database

In [6]:
spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE;")

sql_create_db = f"""
    CREATE DATABASE IF NOT EXISTS {dest_database}
    COMMENT 'DS-2002 Project 2 Database'
    WITH DBPROPERTIES (contains_pii = true, purpose = 'DS-2002 Project 2');
"""
spark.sql(sql_create_db)

DataFrame[]

### Section II: Populate Dimensions
#### Extract Date Dimension from MySQL Adventureworks Database

##### Fetch data from dim_date table in MySQL

In [7]:
sql_dim_date = f"SELECT * FROM {mysql_args['db_name']}.dim_date"
df_dim_date = get_mysql_dataframe(spark, sql_dim_date, **mysql_args)

##### Save as the dim_date table in the Data Lakehouse

In [8]:
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")

##### Describe and preview Table

In [9]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_date;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_date LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|            date_key|      int|   NULL|
|           full_date|     date|   NULL|
|           date_name| char(11)|   NULL|
|        date_name_us| char(11)|   NULL|
|        date_name_eu| char(11)|   NULL|
|         day_of_week|  tinyint|   NULL|
|    day_name_of_week| char(10)|   NULL|
|        day_of_month|  tinyint|   NULL|
|         day_of_year|      int|   NULL|
|     weekday_weekend| char(10)|   NULL|
|        week_of_year|  tinyint|   NULL|
|          month_name| char(10)|   NULL|
|       month_of_year|  tinyint|   NULL|
|is_last_day_of_month|  char(1)|   NULL|
|    calendar_quarter|  tinyint|   NULL|
|       calendar_year|      int|   NULL|
| calendar_year_month| char(10)|   NULL|
|   calendar_year_qtr| char(10)|   NULL|
|fiscal_month_of_year|  tinyint|   NULL|
|      fiscal_quarter|  tinyint|   NULL|
+--------------------+---------+-------+
only showing top

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


#### Extract Product Dimension from MySQL Adventureworks Database

##### Fetch data from Product table in MySQL

In [10]:
sql_dim_products = f"SELECT * FROM {mysql_args['db_name']}.product"
df_dim_products = get_mysql_dataframe(spark, sql_dim_products, **mysql_args)

df_dim_products.toPandas().head(2)

Unnamed: 0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,rowguid,ModifiedDate
0,1,Adjustable Race,AR-5381,False,False,,1000,750,0.0,0.0,...,,,,,,1998-06-01,NaT,NaT,"[183, 21, 66, 105, 247, 8, 13, 76, 172, 177, 2...",2004-03-11 10:01:36
1,2,Bearing Ball,BA-8327,False,False,,1000,750,0.0,0.0,...,,,,,,1998-06-01,NaT,NaT,"[32, 60, 174, 88, 58, 79, 73, 71, 167, 212, 21...",2004-03-11 10:01:36


##### Perform any necessary transformations

In [11]:
# Add primary key column
df_dim_products.createOrReplaceTempView("products")
sql_products = f"""
    SELECT *, ROW_NUMBER() OVER (PARTITION BY ProductID ORDER BY ProductID) AS product_key
    FROM products;
"""
df_dim_products = spark.sql(sql_products)

# Drop unwanted columns
df_dim_products = df_dim_products.drop('rowguid')

# Reorder columns and display the first two rows in a Pandas dataframe
ordered_columns = ['product_key', 'ProductID', 'Name', 'ProductNumber', 'MakeFlag', 'FinishedGoodsFlag',
                       'Color', 'StandardCost', 'ListPrice', 'Size', 'SizeUnitMeasureCode', 'Weight', 
                       'WeightUnitMeasureCode', 'SafetyStockLevel', 'ReorderPoint', 'DaysToManufacture',
                       'ProductLine', 'Class', 'Style', 'ProductSubcategoryID', 'ProductModelID', 
                       'SellStartDate', 'SellEndDate', 'DiscontinuedDate', 'ModifiedDate']

df_dim_products = df_dim_products[ordered_columns]
df_dim_products.toPandas().head(2)

Unnamed: 0,product_key,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,StandardCost,ListPrice,Size,...,DaysToManufacture,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,ModifiedDate
0,1,326,Decal 2,DC-9824,False,False,,0.0,0.0,,...,0,,,,,,1998-06-01,NaT,NaT,2004-03-11 10:01:36
1,1,343,Flat Washer 2,FW-1400,False,False,,0.0,0.0,,...,0,,,,,,1998-06-01,NaT,NaT,2004-03-11 10:01:36


##### Save as the dim_product table in the Data Lakehouse

In [12]:
df_dim_products.write.saveAsTable(f"{dest_database}.dim_product", mode="overwrite")

##### Describe and preview table

In [13]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_product;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_product LIMIT 2").toPandas()

+--------------------+------------+-------+
|            col_name|   data_type|comment|
+--------------------+------------+-------+
|         product_key|         int|   NULL|
|           ProductID|         int|   NULL|
|                Name| varchar(50)|   NULL|
|       ProductNumber| varchar(25)|   NULL|
|            MakeFlag|     boolean|   NULL|
|   FinishedGoodsFlag|     boolean|   NULL|
|               Color| varchar(15)|   NULL|
|        StandardCost|      double|   NULL|
|           ListPrice|      double|   NULL|
|                Size|  varchar(5)|   NULL|
| SizeUnitMeasureCode|  varchar(3)|   NULL|
|              Weight|decimal(8,2)|   NULL|
|WeightUnitMeasure...|  varchar(3)|   NULL|
|    SafetyStockLevel|         int|   NULL|
|        ReorderPoint|         int|   NULL|
|   DaysToManufacture|         int|   NULL|
|         ProductLine|  varchar(2)|   NULL|
|               Class|  varchar(2)|   NULL|
|               Style|  varchar(2)|   NULL|
|ProductSubcategoryID|         i

Unnamed: 0,product_key,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,StandardCost,ListPrice,Size,...,DaysToManufacture,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,ModifiedDate
0,1,2,Bearing Ball,BA-8327,False,False,,0.0,0.0,,...,0,,,,,,1998-06-01,NaT,NaT,2004-03-11 10:01:36
1,1,4,Headset Ball Bearings,BE-2908,False,False,,0.0,0.0,,...,0,,,,,,1998-06-01,NaT,NaT,2004-03-11 10:01:36


#### Extract Employee Dimension from JSON File

##### Create a new MongoDB Database and load employee JSON file in a new MongoDB Collection

In [14]:
client = get_mongo_client(**mongodb_args)

json_files = {"employee" : "adventureworks_employee.json"}

set_mongo_collections(client, mongodb_args["db_name"], batch_dir, json_files) 

##### Fetch data from the new MongoDB Employee Collection

In [15]:
mongodb_args["collection"] = "employee"

df_dim_employee = get_mongodb_dataframe(spark, **mongodb_args)
df_dim_employee.toPandas().head(2)

Unnamed: 0,BirthDate,CurrentFlag,EmailAddress,EmailPromotion,EmployeeID,FirstName,Gender,HireDate,LastName,LoginID,ManagerID,MaritalStatus,MiddleName,NationalIDNumber,Phone,SalariedFlag,SickLeaveHours,Title,VacationHours
0,1972-05-15 00:00:00,1,guy1@adventure-works.com,0,1,Guy,M,1996-07-31 00:00:00,Gilbert,adventure-works\guy1,16.0,M,R,14417807,320-555-0195,0,30,Production Technician - WC60,21
1,1977-06-03 00:00:00,1,kevin0@adventure-works.com,2,2,Kevin,M,1997-02-26 00:00:00,Brown,adventure-works\kevin0,6.0,S,F,253022876,150-555-0189,0,41,Marketing Assistant,42


##### Perform any necessary transformations

In [16]:
# Add primary key column
df_dim_employee.createOrReplaceTempView("employee")
sql_employee = f"""
    SELECT *, ROW_NUMBER() OVER (PARTITION BY EmployeeID ORDER BY EmployeeID) AS employee_key
    FROM employee;
"""
df_dim_employee = spark.sql(sql_employee)

# Reorder columns and display first two rows in a Pandas dataframe
ordered_columns = ['employee_key', 'EmployeeID', 'NationalIDNumber', 'LoginID', 'ManagerID', 'FirstName', 
                       'MiddleName', 'LastName', 'Title', 'EmailAddress', 'EmailPromotion', 'Phone', 'BirthDate', 
                       'MaritalStatus', 'Gender', 'HireDate', 'SalariedFlag', 'VacationHours', 'SickLeaveHours', 'CurrentFlag']

df_dim_employee = df_dim_employee[ordered_columns]
df_dim_employee.toPandas().head(2)

Unnamed: 0,employee_key,EmployeeID,NationalIDNumber,LoginID,ManagerID,FirstName,MiddleName,LastName,Title,EmailAddress,EmailPromotion,Phone,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag
0,1,14,233069302,adventure-works\taylor0,21.0,Taylor,R,Maxwell,Production Supervisor - WC50,taylor0@adventure-works.com,0,508-555-0165,1946-05-03 00:00:00,M,M,1998-03-11 00:00:00,0,79,59,1
1,1,18,494170342,adventure-works\john0,21.0,John,T,Campbell,Production Supervisor - WC60,john0@adventure-works.com,1,435-555-0113,1946-09-08 00:00:00,M,M,1998-04-18 00:00:00,0,81,60,1


##### Save as the dim_employee table in the Data lakehouse

In [17]:
df_dim_employee.write.saveAsTable(f"{dest_database}.dim_employee", mode="overwrite")

##### Describe and preview table

In [18]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_employee;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_employee LIMIT 2").toPandas()

+----------------+---------+-------+
|        col_name|data_type|comment|
+----------------+---------+-------+
|    employee_key|      int|   NULL|
|      EmployeeID|      int|   NULL|
|NationalIDNumber|   string|   NULL|
|         LoginID|   string|   NULL|
|       ManagerID|      int|   NULL|
|       FirstName|   string|   NULL|
|      MiddleName|   string|   NULL|
|        LastName|   string|   NULL|
|           Title|   string|   NULL|
|    EmailAddress|   string|   NULL|
|  EmailPromotion|      int|   NULL|
|           Phone|   string|   NULL|
|       BirthDate|   string|   NULL|
|   MaritalStatus|   string|   NULL|
|          Gender|   string|   NULL|
|        HireDate|   string|   NULL|
|    SalariedFlag|   string|   NULL|
|   VacationHours|      int|   NULL|
|  SickLeaveHours|      int|   NULL|
|     CurrentFlag|   string|   NULL|
+----------------+---------+-------+
only showing top 20 rows



Unnamed: 0,employee_key,EmployeeID,NationalIDNumber,LoginID,ManagerID,FirstName,MiddleName,LastName,Title,EmailAddress,EmailPromotion,Phone,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag
0,1,3,509647174,adventure-works\roberto0,12,Roberto,,Tamburello,Engineering Manager,roberto0@adventure-works.com,0,212-555-0187,1964-12-13 00:00:00,M,M,1997-12-12 00:00:00,1,2,21,1
1,1,7,309738752,adventure-works\jolynn0,21,JoLynn,M,Dobney,Production Supervisor - WC60,jolynn0@adventure-works.com,1,903-555-0145,1946-02-16 00:00:00,S,F,1998-01-26 00:00:00,0,82,61,1


#### Extract Vendors Dimension from CSV File

##### Use PySpark to read data from a CSV file

In [19]:
vendors_csv = os.path.join(batch_dir, 'adventureworks_vendors.csv')
print(vendors_csv)

df_dim_vendors = spark.read.format('csv').options(header='true', inferSchema='true').load(vendors_csv)
df_dim_vendors.toPandas().head(2)

C:\Users\kyral\Data Systems\DS-2002-main\Project 2 - Capstone\project_data\adventureworks\batch\adventureworks_vendors.csv


Unnamed: 0,VendorID,AccountNumber,Name,CreditRating,PreferredVendorStatus,ActiveFlag,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,PostalCode
0,1,INTERNAT0001,International,1,1,1,Main Office,683 Larch Ct.,,Salt Lake City,UT,Utah,84101
1,2,ELECTRON0002,Electronic Bike Repair & Supplies,1,1,1,Main Office,8547 Catherine Way,,Tacoma,WA,Washington,98403


##### Perform any necessary transformations

In [20]:
# Add primary key column
df_dim_vendors.createOrReplaceTempView("vendors")
sql_vendors = f"""
    SELECT *, ROW_NUMBER() OVER (PARTITION BY VendorID ORDER BY VendorID) AS vendor_key
    FROM vendors;
"""
df_dim_vendors = spark.sql(sql_vendors)

# Reorder columns and display first two rows in a Pandas dataframe
ordered_columns = ['vendor_key', 'VendorID', 'AccountNumber', 'Name', 'CreditRating', 
                       'PreferredVendorStatus', 'ActiveFlag', 'AddressType', 'AddressLine1', 
                       'AddressLine2', 'City', 'StateProvinceCode', 'State_Province', 'PostalCode']

df_dim_vendors = df_dim_vendors[ordered_columns]
df_dim_vendors.toPandas().head(2)

Unnamed: 0,vendor_key,VendorID,AccountNumber,Name,CreditRating,PreferredVendorStatus,ActiveFlag,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,PostalCode
0,1,14,LIGHTSP0001,Light Speed,1,1,1,Main Office,298 Sunnybrook Drive,,Spring Valley,CA,California,91977
1,1,18,G&KBI0001,G & K Bicycle Corp.,1,1,1,Main Office,8981 Carmel Drive,,W. Linn,NV,Nevada,89701


##### Save as the dim_vendors table in the Data Lakehouse

In [21]:
df_dim_vendors.write.saveAsTable(f"{dest_database}.dim_vendors", mode="overwrite")

##### Describe and preview table

In [22]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_vendors;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_vendors LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|          vendor_key|                 int|   NULL|
|            VendorID|                 int|   NULL|
|       AccountNumber|              string|   NULL|
|                Name|              string|   NULL|
|        CreditRating|                 int|   NULL|
|PreferredVendorSt...|                 int|   NULL|
|          ActiveFlag|                 int|   NULL|
|         AddressType|              string|   NULL|
|        AddressLine1|              string|   NULL|
|        AddressLine2|              string|   NULL|
|                City|              string|   NULL|
|   StateProvinceCode|              string|   NULL|
|      State_Province|              string|   NULL|
|          PostalCode|                 int|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|           

Unnamed: 0,vendor_key,VendorID,AccountNumber,Name,CreditRating,PreferredVendorStatus,ActiveFlag,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,PostalCode
0,1,3,PREMIER0001,"Premier Sport, Inc.",1,1,1,Main Office,7682 Fern Leaf Lane,,Boston,MA,Massachusetts,2113
1,1,7,MOUNTAIN0001,Mountain Works,1,0,1,Main Office,8 Rogers Ave.,,Everett,WA,Washington,98201


#### Verify Dimension Tables

In [23]:
spark.sql(f"USE {dest_database};")
spark.sql("SHOW TABLES").toPandas()

Unnamed: 0,namespace,tableName,isTemporary
0,adventureworks_dlh,dim_date,False
1,adventureworks_dlh,dim_employee,False
2,adventureworks_dlh,dim_product,False
3,adventureworks_dlh,dim_vendors,False
4,,employee,True
5,,products,True
6,,vendors,True


### Section III: Integrate Reference Data with Real-Time Data
#### Create the Bronze Layer
##### Read "raw" JSON file data into a Stream

In [24]:
df_purchase_orders_bronze = (
    spark.readStream \
    .option("schemaLocation", purchase_orders_output_bronze) \
    .option("maxFilesPerTrigger", 1) \
    .option("multiLine", "true") \
    .json(purchase_orders_stream_dir)
)

df_purchase_orders_bronze.isStreaming

True

##### Write the Streaming data to a Parquet file

In [25]:
purchase_orders_checkpoint_bronze = os.path.join(purchase_orders_output_bronze, '_checkpoint')

purchase_orders_bronze_query = (
    df_purchase_orders_bronze
    .withColumn("receipt_time", current_timestamp())
    .withColumn("source_file", input_file_name())
    
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("purchase_orders_bronze") \
    .trigger(availableNow = True) \
    .option("checkpointLocation", purchase_orders_checkpoint_bronze) \
    .option("compression", "snappy") \
    .start(purchase_orders_output_bronze)
)

##### Implement query monitoring

In [26]:
print(f"Query ID: {purchase_orders_bronze_query.id}")
print(f"Query Name: {purchase_orders_bronze_query.name}")
print(f"Query Status: {purchase_orders_bronze_query.status}")

Query ID: 43226836-cee2-4a91-bcd6-3e63eb5603dd
Query Name: purchase_orders_bronze
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [27]:
purchase_orders_bronze_query.awaitTermination()

#### Create the Silver Layer
##### Prepare role-playing dimension primary and business keys

In [28]:
df_dim_order_date = df_dim_date.select(col("date_key").alias("order_date_key"), col("full_date").alias("OrderDate")).alias("order_date")
df_dim_ship_date = df_dim_date.select(col("date_key").alias("ship_date_key"), col("full_date").alias("ShipDate")).alias("ship_date")
df_dim_due_date = df_dim_date.select(col("date_key").alias("due_date_key"), col("full_date").alias("DueDate")).alias("due_date")

##### Define Silver Query to join Streaming with Batch data

In [29]:
df_fact = spark.readStream.format("parquet").load(purchase_orders_output_bronze).alias("fact")

df_purchase_orders_silver = df_fact \
    .join(df_dim_employee, "EmployeeID") \
    .join(df_dim_vendors, "VendorID") \
    .join(df_dim_products, "ProductID") \
    .join(df_dim_order_date, col("fact.OrderDate").cast(DateType()) == col("order_date.OrderDate"), "inner") \
    .join(df_dim_ship_date, col("fact.ShipDate").cast(DateType()) == col("ship_date.ShipDate"), "left_outer") \
    .join(df_dim_due_date, col("fact.DueDate").cast(DateType()) == col("due_date.DueDate"), "left_outer") \
    .select(
        col("PurchaseOrderID").cast(LongType()), \
        df_dim_employee.EmployeeID.cast(LongType()), \
        df_dim_vendors.VendorID.cast(LongType()), \
        df_dim_products.ProductID.cast(LongType()), \
        df_dim_ship_date.ship_date_key.cast(LongType()), \
        df_dim_order_date.order_date_key.cast(LongType()), \
        df_dim_due_date.due_date_key.cast(LongType()), \
        col("RevisionNumber"), \
        col("Status"), \
        col("OrderQty"), \
        col("UnitPrice"), \
        col("LineTotal"), \
        col("SubTotal"), \
        col("TaxAmt"), \
        col("TotalDue"), \
        col("ShipMethod"), \
        col("ShipBase"), \
        col("ShipRate"), \
        col("Freight"), \
        col("ReceivedQty"), \
        col("RejectedQty"), \
        col("StockedQty"), \
        col("receipt_time"), \
        col("source_file") \
    )

In [30]:
df_purchase_orders_silver.isStreaming

True

In [31]:
df_purchase_orders_silver.printSchema()

root
 |-- PurchaseOrderID: long (nullable = true)
 |-- EmployeeID: long (nullable = true)
 |-- VendorID: long (nullable = true)
 |-- ProductID: long (nullable = true)
 |-- ship_date_key: long (nullable = true)
 |-- order_date_key: long (nullable = true)
 |-- due_date_key: long (nullable = true)
 |-- RevisionNumber: long (nullable = true)
 |-- Status: long (nullable = true)
 |-- OrderQty: long (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- LineTotal: double (nullable = true)
 |-- SubTotal: double (nullable = true)
 |-- TaxAmt: double (nullable = true)
 |-- TotalDue: double (nullable = true)
 |-- ShipMethod: string (nullable = true)
 |-- ShipBase: double (nullable = true)
 |-- ShipRate: double (nullable = true)
 |-- Freight: double (nullable = true)
 |-- ReceivedQty: double (nullable = true)
 |-- RejectedQty: double (nullable = true)
 |-- StockedQty: double (nullable = true)
 |-- receipt_time: timestamp (nullable = true)
 |-- source_file: string (nullable = true)



##### Write the Transformed Streaming data to the Data Lakehouse

In [32]:
purchase_orders_checkpoint_silver = os.path.join(purchase_orders_output_silver, '_checkpoint')

purchase_orders_silver_query = (
    df_purchase_orders_silver.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("purchase_orders_silver")
    .trigger(availableNow = True) \
    .option("checkpointLocation", purchase_orders_checkpoint_silver) \
    .start(purchase_orders_output_silver)
)

##### Implement query monitoring

In [33]:
print(f"Query ID: {purchase_orders_silver_query.id}")
print(f"Query Name: {purchase_orders_silver_query.name}")
print(f"Query Status: {purchase_orders_silver_query.status}")

Query ID: b6f57ec1-0427-48db-a2b8-9e6211bd21d1
Query Name: purchase_orders_silver
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [34]:
purchase_orders_silver_query.awaitTermination()

#### Create the Gold Layer

##### Define a query to create a business report

In [35]:
df_fact_pos_products_per_vendor_gold = spark.readStream.format("parquet").load(purchase_orders_output_silver) \
    .join(df_dim_products.alias("products"), "ProductID") \
    .join(df_dim_vendors.alias("vendors"), "VendorID") \
    .groupBy(col("products.Name").alias("ProductName"), col("vendors.Name").alias("VendorName")) \
    .agg(
        sum("OrderQty").alias("Total_Quantity_Ordered"),
        sum("LineTotal").alias("Total_Line_cost"),
        sum("TotalDue").alias("Total_Amount_Due")) \
    .orderBy(desc("Total_Quantity_Ordered"))

##### Write Streaming data to memory in "complete" mode

In [36]:
purchase_orders_gold_query = (
    df_fact_pos_products_per_vendor_gold.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("fact_pos_products_per_vendor") \
    .start()
)

In [37]:
wait_until_stream_is_ready(purchase_orders_gold_query, 1)

The stream has processed 1 batchs


##### Query the Gold data from memory

In [38]:
df_fact_pos_products_per_vendor = spark.sql("SELECT * FROM fact_pos_products_per_vendor")
df_fact_pos_products_per_vendor.printSchema()

root
 |-- ProductName: string (nullable = true)
 |-- VendorName: string (nullable = true)
 |-- Total_Quantity_Ordered: long (nullable = true)
 |-- Total_Line_cost: double (nullable = true)
 |-- Total_Amount_Due: double (nullable = true)



##### Create the final selection

In [39]:
df_fact_pos_products_per_vendor_gold_final = df_fact_pos_products_per_vendor \
    .select(col("VendorName"), \
            col("ProductName"), \
            col("Total_Quantity_Ordered"), \
            col("Total_Line_cost"), \
            col("Total_Amount_Due"))

##### Load the final results into a new table and display the rsults

In [40]:
df_fact_pos_products_per_vendor_gold_final.write.saveAsTable(f"{dest_database}.fact_pos_products_per_vendor", mode="overwrite")
spark.sql(f"SELECT * FROM {dest_database}.fact_pos_products_per_vendor").toPandas()

Unnamed: 0,VendorName,ProductName,Total_Quantity_Ordered,Total_Line_cost,Total_Amount_Due
0,SUPERSALES INC.,Decal 1,8750,1837.5000,4060.8750
1,SUPERSALES INC.,Decal 2,8750,1837.5000,4060.8750
2,Anderson's Custom Bikes,Touring Rim,4400,117024.6000,129312.1832
3,Circuit Cycles,Rear Derailleur Cage,4400,25410.0000,61771.7104
4,Capital Road Cycles,LL Spindle/Axle,4400,47216.4000,120837.7168
...,...,...,...,...,...
401,Bergeron Off-Roads,Lock Nut 1,3,149.5620,734.0554
402,WestAmerica Bicycle Co.,Thin-Jam Hex Nut 3,3,136.4895,855.2203
403,Cruger Bike Company,Hex Nut 16,3,110.9115,655.0424
404,Inner City Bikes,Lock Washer 2,3,123.6690,410.2064


#### Stop the Spark Session

In [41]:
spark.stop()