# Capstone Project, Annabelle Claypoole

In this project, I will design and populate a dimensional data lakehouse that represents retail transactions. The data is from a publicly-available dataset from Kaggle.

In [0]:
# First I will import the necessary libraries

import os
import json
import pymongo
import pyspark.pandas as pd  
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

In [0]:
# Next, I must instantiate global variables
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "jdbc:mysql://ds2002mysqlannabelle.mysql.database.azure.com"
jdbc_port = 3306
src_database = "northwind_dw2"

connection_properties = {
  "user" : "annabelleclaypoole",
  "password" : "Passw0rd123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "devcluster2.yghivor"
atlas_database_name = "northwind_dw2"
atlas_user_name = "wry8wh"
atlas_password = "wry8wh"

# Data Files (JSON) Information ###############################
dst_database = "northwind_dlh"

base_dir = "dbfs:/FileStore/lab_data"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

orders_stream_dir = f"{stream_dir}/orders"
purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
inventory_trans_stream_dir = f"{stream_dir}/inventory_transactions"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"

inventory_trans_output_bronze = f"{database_dir}/fact_inventory_transactions/bronze"
inventory_trans_output_silver = f"{database_dir}/fact_inventory_transactions/silver"
inventory_trans_output_gold   = f"{database_dir}/fact_inventory_transactions/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

False

In [0]:
# Subsequently, I will define global functions

##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

In [0]:
# Read the Delta table into a DataFrame
df = spark.read.format("delta") \
  .load("dbfs:/user/hive/warehouse/online_retail")

# Create a temporary view for the DataFrame
df.createOrReplaceTempView("MyDataTable")

# Create table using SQL

In [0]:

%sql

CREATE LIVE TABLE MyDataTable
USING DELTA
AS SELECT * FROM MyDataTable

Name,Type
InvoiceNo,string
StockCode,string
Description,string
Quantity,bigint
InvoiceDate,timestamp
UnitPrice,double
CustomerID,bigint
Country,string


# Date Dimension Table 


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, datediff, sequence, to_date, year, quarter, month, day, weekofyear, dayofweek, month

# Create Spark session
spark = SparkSession.builder.getOrCreate()

# Generate the date range
start_date = '2020-01-01'
end_date = '2030-12-31'
date_range = spark.range(datediff(to_date(end_date, 'yyyy-MM-dd'), to_date(start_date, 'yyyy-MM-dd')) + 1).selectExpr(
    f"date_add('{start_date}', CAST(id AS INT)) AS date_val"
)

# Create Date Dimension table
date_dimension_df = date_range.select(
    expr("CAST(DATE_FORMAT(date_val, 'yyyyMMdd') AS INT) AS DateKey"),
    date_range.date_val.alias('Date'),
    year(date_range.date_val).alias('Year'),
    quarter(date_range.date_val).alias('Quarter'),
    month(date_range.date_val).alias('Month'),
    day(date_range.date_val).alias('Day'),
    weekofyear(date_range.date_val).alias('WeekOfYear'),
    dayofweek(date_range.date_val).alias('DayOfWeek'),
    month(date_range.date_val).alias('MonthName'),
    expr("date_format(date_val, 'EEEE') AS DayOfWeekName")
)

# Write the Date Dimension table to Delta Lake
date_dimension_df.write.format("delta").mode("overwrite").saveAsTable("DateDimensionTable")

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:429)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1225)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:958)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:582)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:685)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:703)
	at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:435)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:433)
	at com.databricks.logging.Usag

# Create customer dimension table


In [0]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("CustomerDimensionTable").getOrCreate()

# Read CSV file into a DataFrame
df = spark.read.format("delta") \
  .load("dbfs:/user/hive/warehouse/online_retail")

# Create a temporary view for the DataFrame
df.createOrReplaceTempView("customers")

# Create customer dimension table
customer_table = df.selectExpr(
    "CAST(CustomerID AS INT) AS customer_id",
    "CustomerID AS customer_name",
    "NULL AS contact_name",
    "NULL AS address",
    "NULL AS city",
    "NULL AS postal_code",
    "Country AS country"
)

# Save the customer dimension table as Delta table
customer_table.write.format("delta").saveAsTable("customers_dimension")

# Stop Spark Session
spark.stop()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2577413797799220>, line 14[0m
[1;32m     11[0m df[38;5;241m.[39mcreateOrReplaceTempView([38;5;124m"[39m[38;5;124mcustomers[39m[38;5;124m"[39m)
[1;32m     13[0m [38;5;66;03m# Create customer dimension table[39;00m
[0;32m---> 14[0m customer_table [38;5;241m=[39m spark[38;5;241m.[39msql([38;5;124m"""[39m
[1;32m     15[0m [38;5;124m    SELECT[39m
[1;32m     16[0m [38;5;124m        CAST(CustomerID AS INT) AS customer_id,[39m
[1;32m     17[0m [38;5;124m        customer_name,[39m
[1;32m     18[0m [38;5;124m        contact_name,[39m
[1;32m     19[0m [38;5;124m        address,[39m
[1;32m     20[0m [38;5;124m        city,[39m
[1;32m     21[0m [38;5;124m        postal_code,[39m
[1;32m     22[0m [38;5;124m        country[39m
[1;32m     23[0m [38;5;1

# Product Dimension Table

In [0]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("ProductDimensionTable").getOrCreate()

# Read the source data into a DataFrame
df = spark.read.format("delta").load("dbfs:/path/to/source/data")

# Create product dimension table
product_table = df.selectExpr(
    "StockCode AS product_code",
    "Description AS product_description",
    "Category AS product_category",
    "NULL AS brand",
    "NULL AS color",
    "NULL AS size"
)

# Write the product dimension table to Delta Lake
product_table.write.format("delta").saveAsTable("product_dimension")

# Stop Spark Session
spark.stop()

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:429)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1225)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:958)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:582)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:685)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:703)
	at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:435)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:433)
	at com.databricks.logging.Usag

# Country Dimension Table

In [0]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("CountryDimensionTable").getOrCreate()

# Read the source data into a DataFrame
df = spark.read.format("csv").option("header", "true").load("/path/to/source/data.csv")

# Create the country dimension table
country_table = df.selectExpr(
    "Country AS country_name",
    "NULL AS region",
    "NULL AS population",
    "NULL AS area"
)

# Write the country dimension table to Delta Lake
country_table.write.format("delta").saveAsTable("country_dimension")

# Stop Spark Session
spark.stop()

# Fact Table

In [0]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("SalesFactTable").getOrCreate()

# Read the source data into a DataFrame
df = spark.read.format("csv").option("header", "true").load("/path/to/source/data.csv")

# Create the fact table
fact_table = df.selectExpr(
    "InvoiceNo AS invoice_number",
    "StockCode AS stock_code",
    "Description AS product_description",
    "Quantity AS quantity",
    "InvoiceDate AS invoice_date",
    "UnitPrice AS unit_price",
    "CustomerID AS customer_id",
    "Country AS country"
)

# Write the fact table to Delta Lake
fact_table.write.format("delta").saveAsTable("sales_fact")

# Stop Spark Session
spark.stop()

# Total sales by product querey

In [0]:
%sql
SELECT ProductID, SUM(UnitPrice * Quantity) AS TotalSales, COUNT(DISTINCT InvoiceNo) AS NumberOfSales
FROM sales_fact
GROUP BY ProductID
ORDER BY TotalSales DESC
LIMIT 10;
