# Final Project for DS2002 - By Gabriel Jacksnon (tbp8gx)

## Summary:
 - I'm using adventureworks as the source dataset. I'm using a schema similar to the schema from the midterm project.
 - Schema:
    - DimCustomers - Read from MySQL
    - DimDates - Read from MySQL
    - DimProducts - Read from MongoDB atlas (stored on the cloud's file storage)
    - DimTerritories - Read from MongoDB atlas (stored on the cloud's file storage)
    - FactSaleOrders - 3 files of data that are streamed locally 

## Design Requirements:
- I have three dimension tables (dimCustomers, dimProducts, dimTerritories).
- I use the date dimension (dimDate).
- I use FactSaleOrders as my fact table that models sales.
- My solution uses dimensions from various sources
    1) I use MySQL for the relational database
    2) I use Mongo for the NoSQL database
    3) I use MongoDB Atlas to pull files from a cloud-based file system. 
- I use static data for the dimension data and dynamic data for the fact data, which can be seen in the data/adventureworks/streaming directory.
- I execute a SELECT query to summarize the top 10 buying customers in the US territory.

## Functional Requirements:
- I use 1 batch execution from the data/adventureworks/streaming files.
- I use Spark AutoLoader to load fact sales source data from data/adventureworks/streaming location with sales1.json, sales2.json, and sales3.json as the 3 intervals of source data.
- I implement this with bronze, silver, and gold-level architecture.

## Procedure:
1. Set up environment to connect to MySQL and MongoDB. Initialize Data Lakehouse files.
2. Load (static) dimensions
3. Perform bronze-level actions
4. Perform silver-level actions
5. Perform gold-level actions

# STEP 1 - SET UP ENVIRONMENT

In [1]:
# 1.1 Handle imports/Runtime issues
import findspark
findspark.init()

import os
import json
import numpy
import datetime
import certifi
import pandas as pd
import sys
import time
import shutil

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W

print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.40
Running PyMongo Version: 4.12.1


In [2]:
# 1.2 Specify connection and initialization information

# --------------------------------------------------------------------------------
# Specify MySQL Server Connection Information
# --------------------------------------------------------------------------------
mysql_args = {
    "host_name" : "localhost",
    "port" : "3306",
    "db_name" : "adventureworks",
    "conn_props" : {
        "user" : "root",
        "password" : "password",
        "driver" : "com.mysql.cj.jdbc.Driver"
    }
}

# --------------------------------------------------------------------------------
# Specify MongoDB Cluster Connection Information
# --------------------------------------------------------------------------------
mongodb_args = {
    "user_name" : "Cluster94547",
    "password" : "cWdKdENBS2pj",
    "cluster_name" : "Cluster94547",
    "cluster_subnet" : "hp0v9",
    "cluster_location" : "atlas", # "local"
    "db_name" : "final"
}

# --------------------------------------------------------------------------------
# Specify Directory Structure for Source Data
# --------------------------------------------------------------------------------
base_dir = os.path.join(os.getcwd(), 'data')
data_dir = os.path.join(base_dir, 'adventureworks')
batch_dir = os.path.join(data_dir, 'batch')
stream_dir = os.path.join(data_dir, 'streaming')

# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "adventureworks_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

trans_output_bronze = os.path.join(database_dir, 'fact_trans', 'bronze')
trans_output_silver = os.path.join(database_dir, 'fact_trans', 'silver')
trans_output_gold = os.path.join(database_dir, 'fact_trans', 'gold')

In [3]:
# 1.3 Specify helper functions (provided from the class)

def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    '''Fetch each item in the directory, and filter out any directories.'''
    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    '''Populate lists with the Size and Last Modification DateTime for each file in the directory.'''
    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']

    return pd.DataFrame(data=data, columns=column_names)


def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)

    print(f"The stream has processed {len(query.recentProgress)} batchs")


def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."

    except Exception as e:
        return f"An error occurred: {e}"


def drop_null_columns(df, threshold):
    '''Drop Columns having a percentage of NULL values that exceeds the given 'threshold' parameter value.'''
    columns_with_nulls = [col for col in df.columns if df.filter(df[col].isNull()).count() / df.count() > threshold]
    df_dropped = df.drop(*columns_with_nulls)

    return df_dropped


def get_mysql_dataframe(spark_session, sql_query : str, **args):
    '''Create a JDBC URL to the MySQL Database'''
    jdbc_url = f"jdbc:mysql://{args['host_name']}:{args['port']}/{args['db_name']}"

    '''Invoke the spark.read.format("jdbc") function to query the database, and fill a DataFrame.'''
    dframe = spark_session.read.format("jdbc") \
        .option("url", jdbc_url) \
        .option("driver", args['conn_props']['driver']) \
        .option("user", args['conn_props']['user']) \
        .option("password", args['conn_props']['password']) \
        .option("query", sql_query) \
        .load()

    return dframe


def get_mongo_uri(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the 'cluster_location' parameter.")

    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri


def get_spark_conf_args(spark_jars : list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "

    sparkConf_args = {
        "app_name" : "PySpark Northwind Data Lakehouse (Medallion Architecture)",
        "worker_threads" : f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions" : int(os.cpu_count()),
        "mongo_uri" : get_mongo_uri(**args),
        "spark_jars" : jars[0:-2],
        "database_dir" : sql_warehouse_dir
    }

    return sparkConf_args


def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name']) \
        .setMaster(args['worker_threads']) \
        .set('spark.driver.memory', '4g') \
        .set('spark.executor.memory', '2g') \
        .set('spark.jars', args['spark_jars']) \
        .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
        .set('spark.mongodb.input.uri', args['mongo_uri']) \
        .set('spark.mongodb.output.uri', args['mongo_uri']) \
        .set('spark.sql.adaptive.enabled', 'false') \
        .set('spark.sql.debug.maxToStringFields', 35) \
        .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
        .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
        .set('spark.sql.streaming.schemaInference', 'true') \
        .set('spark.sql.warehouse.dir', args['database_dir']) \
        .set('spark.streaming.stopGracefullyOnShutdown', 'true')

    return sparkConf


def get_mongo_client(**args):
    '''Get MongoDB Client Connection'''
    mongo_uri = get_mongo_uri(**args)
    if args['cluster_location'] == "atlas":
        client = pymongo.MongoClient(mongo_uri, tlsCAFile=certifi.where())

    elif args['cluster_location'] == "local":
        client = pymongo.MongoClient(mongo_uri)

    else:
        raise Exception("A MongoDB Client could not be created.")

    return client


def set_mongo_collections(mongo_client, db_name : str, data_directory : str, json_files : list):
    db = mongo_client[db_name]

    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    mongo_client.close()


def get_mongodb_dataframe(spark_session, **args):
    '''Query MongoDB, and create a DataFrame'''
    dframe = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("database", args['db_name']) \
        .option("collection", args['collection']).load()

    '''Drop the '_id' index column to clean up the response.'''
    dframe = dframe.drop('_id')

    '''Call the drop_null_columns() function passing in the dataframe.'''
    dframe = drop_null_columns(dframe, args['null_column_threshold'])

    return dframe

In [4]:
# 1.4 Populate MongoDB with source data (products.json, territories.json)

client = get_mongo_client(**mongodb_args)

json_files = {
    "products": "products.json",
    "territories": "territories.json"
}

set_mongo_collections(client, mongodb_args["db_name"], batch_dir, json_files)

In [5]:
# 1.5 Remove any data in data lakehouse directory
remove_directory_tree(database_dir)

"Directory '/Users/gabooj/IdeaProjects/final-ds/spark-warehouse/adventureworks_dlh.db' has been removed successfully."

In [6]:
# 1.6 Create a new spark session

worker_threads = f"local[{int(os.cpu_count()/2)}]"

jars = []
mysql_spark_jar = os.path.join(os.getcwd(), "mysql-connector-j-9.3.0.jar")
jars.append(mysql_spark_jar)

sparkConf_args = get_spark_conf_args(jars, **mongodb_args)

sparkConf = get_spark_conf(**sparkConf_args)
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("OFF")
spark

25/05/09 17:05:06 WARN Utils: Your hostname, Gabes-iMac.local resolves to a loopback address: 127.0.0.1; using 192.168.1.29 instead (on interface en1)
25/05/09 17:05:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/gabooj/.ivy2/cache
The jars for the packages stored in: /Users/gabooj/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8ade8d8f-4afc-4acc-96e6-a60e745d88b9;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/gabooj/IdeaProjects/final-ds/venv/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 113ms :: artifacts dl 5ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   4   |   0   |   0   |   0   ||   4   |   0   |
	---------------------------------------------------------------------
:: retrieving ::

In [7]:
# 1.7 Create a new metadata database

spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE;")

sql_create_db = f"""
    CREATE DATABASE IF NOT EXISTS {dest_database}
    COMMENT 'DS-2002 Final Database'
    WITH DBPROPERTIES (contains_pii = true, purpose = 'DS-2002 Final');
"""
spark.sql(sql_create_db)

DataFrame[]

# STEP 2. LOAD (STATIC/"COLD PATH") DIMENSIONS

In [8]:
# 2.1 Populate the Date Dimension
sql_dim_date = f"SELECT date_key, full_date, date_name FROM {mysql_args['db_name']}.dim_date"
df_dim_date = get_mysql_dataframe(spark, sql_dim_date, **mysql_args)

# 2.2 Save the Date Dimension
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")
df_dim_date.toPandas().head(2)

                                                                                

Unnamed: 0,date_key,full_date,date_name
0,20000101,2000-01-01,2000/01/01
1,20000102,2000-01-02,2000/01/02


In [9]:
# 2.3 Populate the Customers Dimension by executing a MySQL query. This uses joins to collect information from the 'adventureworks' DB.
sql_customers = """
    SELECT 
        c.CustomerID,
        contact.FirstName,
        contact.LastName,
        contact.EmailAddress,
        contact.Phone,
        c.AccountNumber,
        c.CustomerType,
        a.AddressLine1,
        a.AddressLine2,
        a.City,
        a.PostalCode,
        sp.Name AS StateProvince,
        cr.Name AS Country
    FROM adventureworks.customer AS c
    LEFT OUTER JOIN customeraddress AS ca
    ON c.CustomerID = ca.CustomerID
    LEFT OUTER JOIN address AS a 
    ON ca.AddressID = a.AddressID
    LEFT OUTER JOIN addresstype AS t
	ON ca.AddressTypeID = t.AddressTypeID
	LEFT OUTER JOIN stateprovince AS sp
	ON sp.StateProvinceID = a.StateProvinceID
	LEFT OUTER JOIN countryregion AS cr
	ON sp.CountryRegionCode = cr.CountryRegionCode
	LEFT OUTER JOIN individual AS i
	ON c.CustomerID = i.CustomerID
	LEFT OUTER JOIN contact
	ON i.ContactID = contact.ContactID
"""
df_dim_customers = get_mysql_dataframe(spark, sql_customers, **mysql_args)
df_dim_customers.toPandas().head(2)

# ***** Please note that the first customer records do not have all of their information,
# ***** which is why the head of the table (below) is missing some information.

Unnamed: 0,CustomerID,FirstName,LastName,EmailAddress,Phone,AccountNumber,CustomerType,AddressLine1,AddressLine2,City,PostalCode,StateProvince,Country
0,1,,,,,AW00000001,S,2251 Elliot Avenue,,Seattle,98104,Washington,United States
1,2,,,,,AW00000002,S,7943 Walnut Ave,,Renton,98055,Washington,United States


In [10]:
# 2.4 Transform df_dim_customers (add Key from temp view)
df_dim_customers.createOrReplaceTempView("customers")
sql_customers = f"""
    SELECT *, ROW_NUMBER() OVER (PARTITION BY CustomerID ORDER BY CustomerID) AS CustomerKey
    FROM customers;
"""
df_dim_customers = spark.sql(sql_customers)

# 2.5 Save df_dim_customers
df_dim_customers.write.saveAsTable(f"{dest_database}.dim_customers", mode="overwrite")

# 2.6 Demonstrate that df_dim_customers exists in Spark
spark.sql(f"SELECT * FROM {dest_database}.dim_customers LIMIT 2").toPandas()

Unnamed: 0,CustomerID,FirstName,LastName,EmailAddress,Phone,AccountNumber,CustomerType,AddressLine1,AddressLine2,City,PostalCode,StateProvince,Country,CustomerKey
0,1,,,,,AW00000001,S,2251 Elliot Avenue,,Seattle,98104,Washington,United States,1
1,3,,,,,AW00000003,S,12345 Sterling Avenue,,Irving,75061,Texas,United States,1


In [11]:
# 2.7 Populate the Products Dimension
df_dim_products = get_mongodb_dataframe(spark, db_name="final", collection="products", null_column_threshold=0.4)
df_dim_products.toPandas().head()

                                                                                

Unnamed: 0,DaysToManufacture,FinishedGoodsFlag,ListPrice,MakeFlag,Name,ProductID,ProductNumber,ReorderPoint,SafetyStockLevel,SellStartDate,StandardCost
0,0,0,0.0,0,Adjustable Race,1,AR-5381,750,1000,1998-06-01 00:00:00,0.0
1,0,0,0.0,0,Bearing Ball,2,BA-8327,750,1000,1998-06-01 00:00:00,0.0
2,1,0,0.0,1,BB Ball Bearing,3,BE-2349,600,800,1998-06-01 00:00:00,0.0
3,0,0,0.0,0,Headset Ball Bearings,4,BE-2908,600,800,1998-06-01 00:00:00,0.0
4,1,0,0.0,1,Blade,316,BL-2036,600,800,1998-06-01 00:00:00,0.0


In [12]:
# 2.8 Transform the df_dim_products (add Key from temp view)
df_dim_products.createOrReplaceTempView("products")
sql_products = f"""
    SELECT *, ROW_NUMBER() OVER (PARTITION BY ProductID ORDER BY ProductID) AS ProductKey
    FROM products;
"""
df_dim_products = spark.sql(sql_products)

# 2.9 Save df_dim_products
df_dim_products.write.saveAsTable(f"{dest_database}.dim_products", mode="overwrite")

# 2.10 Demonstrate that df_dim_products exists in Spark
spark.sql(f"SELECT * FROM {dest_database}.dim_products LIMIT 2").toPandas()

Unnamed: 0,DaysToManufacture,FinishedGoodsFlag,ListPrice,MakeFlag,Name,ProductID,ProductNumber,ReorderPoint,SafetyStockLevel,SellStartDate,StandardCost,ProductKey
0,0,0,0.0,0,Decal 2,326,DC-9824,750,1000,1998-06-01 00:00:00,0.0,1
1,0,0,0.0,0,Hex Nut 12,384,HN-3816,750,1000,1998-06-01 00:00:00,0.0,1


In [13]:
# 2.11 Populate the Territories dimension
df_dim_territories = get_mongodb_dataframe(spark, db_name="final", collection="territories", null_column_threshold=0.4)
df_dim_territories.toPandas().head()

Unnamed: 0,CountryRegionCode,Group,TerritoryID,TerritoryName
0,US,North America,1,Northwest
1,US,North America,2,Northeast
2,US,North America,3,Central
3,US,North America,4,Southwest
4,US,North America,5,Southeast


In [14]:
# 2.12 Transform the df_dim_territories (add Key from temp view)
df_dim_territories.createOrReplaceTempView("territories")
sql_territories = f"""
    SELECT *, ROW_NUMBER() OVER (PARTITION BY TerritoryID ORDER BY TerritoryID) AS TerritoryKey
    FROM territories;
"""
df_dim_territories = spark.sql(sql_territories)

# 2.13 Save df_dim_products
df_dim_territories.write.saveAsTable(f"{dest_database}.dim_territories", mode="overwrite")

# 2.14 Demonstrate that df_dim_products exists in Spark
spark.sql(f"SELECT * FROM {dest_database}.dim_territories LIMIT 2").toPandas()

Unnamed: 0,CountryRegionCode,Group,TerritoryID,TerritoryName,TerritoryKey
0,US,North America,1,Northwest,1
1,US,North America,3,Central,1


# STEP 3 - BRONZE PHASE (Reading raw data from streamed files)

In [15]:
# BRONZE PHASE 

# 3.1 Read streamed files 
df_trans_bronze = spark.readStream \
    .option("schemaLocation", trans_output_bronze) \
    .option("maxFilesPerTrigger", 1) \
    .option("multiLine", True) \
    .json(stream_dir)

# 3.2 Write 'bronze' dataframe to parquet file 
trans_checkpoint_bronze = os.path.join(trans_output_bronze, "_checkpoint")
trans_bronze_query = (df_trans_bronze
                      .withColumn("receivedTime", current_timestamp())
                      .withColumn("sourceFile", input_file_name())
                      .writeStream.format("parquet")
                      .outputMode("append")
                      .queryName("trans_bronze")
                      .trigger(availableNow=True)
                      .option("checkpointLocation", trans_checkpoint_bronze)
                      .option("compression", "snappy").
                      start(trans_output_bronze))

# 3.3 Wait for completion of the bronze job
trans_bronze_query.awaitTermination()
df_trans_bronze.printSchema()

# 3.4 Show number of records
df_output = spark.read.parquet(trans_output_bronze)
print(f"Row count: {df_output.count()}")

root
 |-- CarrierTrackingNumber: string (nullable = true)
 |-- Credit Card ExpMonth: long (nullable = true)
 |-- Credit Card ExpYear: long (nullable = true)
 |-- Credit Card Number: string (nullable = true)
 |-- Credit Card Type: string (nullable = true)
 |-- CreditCardApprovalCode: string (nullable = true)
 |-- CustomerID: long (nullable = true)
 |-- DueDate: string (nullable = true)
 |-- Freight: double (nullable = true)
 |-- LineTotal: double (nullable = true)
 |-- OnlineOrderFlag: string (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- OrderQty: long (nullable = true)
 |-- ProductID: long (nullable = true)
 |-- PurchaseOrderNumber: string (nullable = true)
 |-- SalesOrderID: long (nullable = true)
 |-- SalesOrderNumber: string (nullable = true)
 |-- SalesPersonID: long (nullable = true)
 |-- ShipBase: double (nullable = true)
 |-- ShipDate: string (nullable = true)
 |-- ShipMethod: string (nullable = true)
 |-- ShipRate: double (nullable = true)
 |-- Status: long (nu

# STEP 4 - SILVER PHASE (Integrating "Cold-Path" Data with Bronze layer)

In [16]:
# SILVER PHASE 

# 4.1 Read bronze output into a stream
df_trans_silver = (spark.readStream
    .format("parquet")
    .load(trans_output_bronze)
    # Join dimensional tables 
    .join(df_dim_products.alias("product"), "ProductID")
    .join(df_dim_customers.alias("customer"), "CustomerID")
    .join(df_dim_territories.alias("territory"), "TerritoryID")

    # Join dimensional date table for each 'Date' in Fact Table
    .join(
        df_dim_date.withColumn("FullShipDate", col("full_date")).alias("dim_date"),
        (col("dim_date.FullShipDate").cast(DateType()) == from_unixtime(col("ShipDate") / 1000).cast(DateType())), 
        "left"
    )
    .join(
        df_dim_date.withColumn("FullOrderDate", col("full_date")).alias("dim_date2"),
        (col("dim_date2.FullOrderDate").cast(DateType()) == from_unixtime(col("OrderDate") / 1000).cast(DateType())),
        "left"
    )
    .join(
        df_dim_date.withColumn("FullDueDate", col("full_date")).alias("dim_date3"),
        (col("dim_date3.FullDueDate").cast(DateType()) == from_unixtime(col("DueDate") / 1000).cast(DateType())),
        "left"
    )

    # Ensure correct typing of dates
    .withColumn("DueDate", col("DueDate").cast(DateType()))
    .withColumn("OrderDate", col("OrderDate").cast(DateType()))
    .withColumn("ShipDate", col("ShipDate").cast(DateType()))

    # Drop any redundant data (from joined tables)
    .drop("date_name")
    .drop("date_key")
    .drop("full_date")              
)

# 4.2 Write 'silver' dataframe to parquet file 
trans_checkpoint_silver = os.path.join(trans_output_silver, "_checkpoint")
trans_silver_query = (
    df_trans_silver.writeStream
    .format("parquet")
    .outputMode("append")
    .queryName("trans_silver")
    .trigger(availableNow=True)
    .option("checkpointLocation", trans_checkpoint_silver)
    .option("compression", "snappy")
    .start(trans_output_silver)
)

# 4.3 Wait for completion of the 'silver' job
trans_silver_query.awaitTermination()
df_trans_silver.printSchema()

# 4.4 Show number of records
df_output = spark.read.parquet(trans_output_silver)
print(f"Row count: {df_output.count()}")
print(f"Query ID: {trans_silver_query.id}")
print(f"Query Name: {trans_silver_query.name}")
print(f"Query Status: {trans_silver_query.status}")

root
 |-- TerritoryID: long (nullable = true)
 |-- CustomerID: long (nullable = true)
 |-- ProductID: long (nullable = true)
 |-- CarrierTrackingNumber: string (nullable = true)
 |-- Credit Card ExpMonth: long (nullable = true)
 |-- Credit Card ExpYear: long (nullable = true)
 |-- Credit Card Number: string (nullable = true)
 |-- Credit Card Type: string (nullable = true)
 |-- CreditCardApprovalCode: string (nullable = true)
 |-- DueDate: date (nullable = true)
 |-- Freight: double (nullable = true)
 |-- LineTotal: double (nullable = true)
 |-- OnlineOrderFlag: string (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- OrderQty: long (nullable = true)
 |-- PurchaseOrderNumber: string (nullable = true)
 |-- SalesOrderID: long (nullable = true)
 |-- SalesOrderNumber: string (nullable = true)
 |-- SalesPersonID: long (nullable = true)
 |-- ShipBase: double (nullable = true)
 |-- ShipDate: date (nullable = true)
 |-- ShipMethod: string (nullable = true)
 |-- ShipRate: double (nul

# STEP 5 - GOLD PHASE (PERFORM AGGREGATIONS/BUSINESS LOGIC)

In [17]:
# GOLD PHASE

# 5.1 Example business-level logic:
# - Get the first and last name of the top 10 customers that have spent the most in the US 
# territory. If the data warehouse does not contain a first and last name for the customer, do not
# include this value (as not all customers have first and last name data from the adventureworks
# database).

# 5.2 Write query to get business-level logic
from pyspark.sql.functions import sum as _sum, col
df_trans_gold = (
    spark.readStream
    .format("parquet")
    .load(trans_output_silver)
    .filter(col("CountryRegionCode") == 'US')
    .filter(col("FirstName").isNotNull())
    .filter(col("LastName").isNotNull())
    .groupby(col("CustomerID"), col("FirstName"), col("LastName"))
    .agg(_sum(col("TotalDue")).alias("TotalSpent"))
)

# 5.3 Output query to memory
trans_gold_query = (
    df_trans_gold.writeStream
    .format("memory")
    .outputMode("complete")
    .queryName("businessQuery")
    .start()
)

# 5.4 Wait until all of the query is in memory
wait_until_stream_is_ready(trans_gold_query, 1)

The stream has processed 1 batchs


In [19]:
# 5.5 Get business query data after stream finished
df_business_query = spark.sql("SELECT * FROM businessQuery")
df_business_query.printSchema()

# 5.6 Write business query as table
df_business_query.write.saveAsTable(f"{dest_database}.fact_business_query", mode="overwrite")

# 5.7 Show result of business query
spark.sql(f"SELECT * FROM {dest_database}.fact_business_query ORDER BY TotalSpent DESC LIMIT 10").toPandas()

# Why the TotalSpent column is the same:
# Yes, the total spent for the top-spending individuals are the same. I used a subset of all the fact sales data from the adventureworks database (999 total records in 3 .json files in the /data/adventureworks/streaming/ directory). This means I do not use all the fact sales data as I meant to use these files as an example of real-time streamed data in this directory for the system to process. This is why the output from this table is different from the midterm, despite using similar schemas (they don't use exactly the same data). 

root
 |-- CustomerID: long (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- TotalSpent: double (nullable = true)


Unnamed: 0,CustomerID,FirstName,LastName,TotalSpent
0,27606,Courtney,Edwards,3953.9884
1,27577,Patrick,Cook,3953.9884
2,27668,Kevin,Gonzalez,3953.9884
3,27625,Alexandria,Hughes,3953.9884
4,27611,Jack,Edwards,3953.9884
5,27605,Miguel,Martinez,3953.9884
6,27651,Jeremy,Murphy,3953.9884
7,27621,Edward,Brown,3953.9884
8,27646,Jackson,Li,3953.9884
9,27636,Paige,Murphy,3953.9884
