In [1]:
import os
import sys
import json
import time
import shutil
import glob

import findspark
findspark.init()

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

from delta import configure_spark_with_delta_pip

In [2]:
#Connections
mysql_args = {
    "uid": "root",
    "pwd": "ILike5Trains.",
    "hostname": "localhost",
    "dbname": "adventureworks"
}

dw_name = "aw_dw"

mongodb_args = {
    "user_name": "dbrichthree",
    "password": "ILike5Trains.",
    "cluster_name": "Cluster0",
    "cluster_subnet": "pjne9qy",
    "cluster_location": "atlas",
    "db_name": "aw_mongo",
    "collection": "",
    "null_column_threshold": 0.5
}

In [3]:
#Directorys
dest_database = "aw_final_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
database_dir = os.path.join(sql_warehouse_dir, dest_database)

stream_root = os.path.join(os.getcwd(), "stream_data")
sales_stream_dir = os.path.join(stream_root, "sales_order_detail")

dim_dir = os.path.join(database_dir, "dim")
fact_dir = os.path.join(database_dir, "fact_sales_order_lines")

dim_date_path = os.path.join(dim_dir, "dim_date")
dim_products_path = os.path.join(dim_dir, "dim_products")
dim_suppliers_path = os.path.join(dim_dir, "dim_suppliers")

fact_bronze_path = os.path.join(fact_dir, "bronze")
fact_silver_path = os.path.join(fact_dir, "silver")
fact_gold_path = os.path.join(fact_dir, "gold")

chk_dir = os.path.join(database_dir, "_checkpoints")
bronze_chk = os.path.join(chk_dir, "fact_sales_bronze")

for p in [
    database_dir, stream_root, sales_stream_dir,
    dim_dir, fact_dir,
    dim_date_path, dim_products_path, dim_suppliers_path,
    fact_bronze_path, fact_silver_path, fact_gold_path,
    chk_dir, bronze_chk
]:
    os.makedirs(p, exist_ok=True)

In [4]:
# Set ups
def remove_directory_tree(path: str):
    if os.path.exists(path):
        try:
            shutil.rmtree(path)
        except PermissionError:
            ts = time.strftime("%Y%m%d_%H%M%S")
            shutil.move(path, f"{path}__OLD__{ts}")

def get_mongo_uri(**args):
    if args["cluster_location"] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"
    return uri

def get_spark_conf_args(spark_jars: list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "
    sparkConf_args = {
        "app_name": "PySpark AdventureWorks Final (Streaming + Delta)",
        "worker_threads": f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions": int(os.cpu_count()),
        "mongo_uri": get_mongo_uri(**args),
        "spark_jars": jars[0:-2],
        "database_dir": sql_warehouse_dir
    }
    return sparkConf_args

def get_jdbc_url(**args):
    return f"jdbc:mysql://{args['hostname']}:3306/{args['dbname']}"

def get_mysql_spark_df(spark_session, sql_query: str, **args):
    return (
        spark_session.read.format("jdbc")
        .option("url", get_jdbc_url(**args))
        .option("driver", "com.mysql.cj.jdbc.Driver")
        .option("user", args["uid"])
        .option("password", args["pwd"])
        .option("query", sql_query)
        .load()
    )

In [5]:
#Create a Spark Session
remove_directory_tree(database_dir)
os.makedirs(database_dir, exist_ok=True)

jars = []

SPARK_HOME = os.environ.get("SPARK_HOME", r"C:\spark-3.5.4-bin-hadoop3")
mysql_candidates = glob.glob(os.path.join(SPARK_HOME, "jars", "mysql-connector-j-*.jar")) + \
                   glob.glob(os.path.join(SPARK_HOME, "jars", "mysql-connector-java-*.jar"))

if len(mysql_candidates) == 0:
    raise RuntimeError(f"No MySQL JDBC jar found in {os.path.join(SPARK_HOME, 'jars')}. Put mysql-connector-j-8.0.33.jar there.")

jars.append(mysql_candidates[0])

def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name']) \
        .setMaster(args['worker_threads']) \
        .set('spark.driver.memory', '4g') \
        .set('spark.executor.memory', '2g') \
        .set('spark.jars', args['spark_jars']) \
        .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
        .set('spark.mongodb.input.uri', args['mongo_uri']) \
        .set('spark.mongodb.output.uri', args['mongo_uri']) \
        .set('spark.sql.adaptive.enabled', 'false') \
        .set('spark.sql.debug.maxToStringFields', 50) \
        .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
        .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
        .set('spark.sql.streaming.schemaInference', 'true') \
        .set('spark.sql.warehouse.dir', database_dir) \
        .set('spark.streaming.stopGracefullyOnShutdown', 'true') \
        .set("spark.driver.bindAddress", "127.0.0.1") \
        .set("spark.driver.host", "127.0.0.1") \
        .set("spark.ui.enabled", "false") \
        .set("spark.port.maxRetries", "200")
    return sparkConf

sparkConf_args = get_spark_conf_args(jars, **mongodb_args)
sparkConf = get_spark_conf(**sparkConf_args)

builder = SparkSession.builder.config(conf=sparkConf) \
    .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') \
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("OFF")

spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE;")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {dest_database};")
spark.sql(f"USE {dest_database};")

DataFrame[]

In [6]:
# Extracting SQL Data
sql_products = "SELECT ProductID AS product_id, Name AS product_name FROM product"
df_products_src = get_mysql_spark_df(spark, sql_products, **mysql_args)

sql_vendor = "SELECT VendorID AS supplier_id, Name AS company FROM vendor"
df_suppliers_src = get_mysql_spark_df(spark, sql_vendor, **mysql_args)

sql_sales = """
SELECT sod.SalesOrderDetailID AS order_detail_id,
       soh.SalesOrderID AS order_id,
       sod.ProductID AS product_id,
       soh.OrderDate AS order_date,
       sod.OrderQty AS quantity,
       sod.UnitPrice AS unit_price
FROM salesorderheader AS soh
JOIN salesorderdetail AS sod
  ON soh.SalesOrderID = sod.SalesOrderID
"""
df_sales_src = get_mysql_spark_df(spark, sql_sales, **mysql_args)

In [7]:
# Dimensions
df_dates = (
    df_sales_src
    .select(to_date(col("order_date")).alias("full_date"))
    .dropna()
    .dropDuplicates(["full_date"])
    .withColumn("date_key", date_format(col("full_date"), "yyyyMMdd").cast("int"))
    .withColumn("year", year(col("full_date")))
    .withColumn("month", month(col("full_date")))
    .withColumn("day", dayofmonth(col("full_date")))
    .select("date_key", "full_date", "year", "month", "day")
)

df_products = (
    df_products_src
    .dropna()
    .dropDuplicates(["product_id"])
    .withColumn("product_key", row_number().over(Window.orderBy(col("product_id"))).cast("int"))
    .select("product_key", "product_id", "product_name")
)

df_suppliers = (
    df_suppliers_src
    .dropna()
    .dropDuplicates(["supplier_id"])
    .withColumn("supplier_key", row_number().over(Window.orderBy(col("supplier_id"))).cast("int"))
    .select("supplier_key", "supplier_id", "company")
)

In [8]:
# Load Dimensions
(df_dates.write.format("delta").mode("overwrite").save(dim_date_path))
(df_products.write.format("delta").mode("overwrite").save(dim_products_path))
(df_suppliers.write.format("delta").mode("overwrite").save(dim_suppliers_path))

dim_date_loc = dim_date_path.replace("\\", "/")
dim_products_loc = dim_products_path.replace("\\", "/")
dim_suppliers_loc = dim_suppliers_path.replace("\\", "/")

spark.sql(f"CREATE TABLE IF NOT EXISTS dim_date USING DELTA LOCATION '{dim_date_loc}'")
spark.sql(f"CREATE TABLE IF NOT EXISTS dim_products USING DELTA LOCATION '{dim_products_loc}'")
spark.sql(f"CREATE TABLE IF NOT EXISTS dim_suppliers USING DELTA LOCATION '{dim_suppliers_loc}'")

DataFrame[]

In [9]:
# ---------- STREAM (FACT -> BRONZE: DELTA) ----------
sales_schema = StructType([
    StructField("SalesOrderDetailID", IntegerType(), True),
    StructField("SalesOrderID", IntegerType(), True),
    StructField("ProductID", IntegerType(), True),
    StructField("OrderDate", StringType(), True),
    StructField("OrderQty", IntegerType(), True),
    StructField("UnitPrice", DoubleType(), True)
])

df_sales_stream = (
    spark.readStream
    .schema(sales_schema)
    .json(sales_stream_dir)
    .selectExpr(
        "SalesOrderDetailID as order_detail_id",
        "SalesOrderID as order_id",
        "ProductID as product_id",
        "to_date(OrderDate) as order_date",
        "OrderQty as quantity",
        "UnitPrice as unit_price"
    )
)

bronze_query = (
    df_sales_stream
    .writeStream
    .format("delta")
    .option("checkpointLocation", bronze_chk)
    .option("mergeSchema", "true")
    .trigger(availableNow=True)
    .start(fact_bronze_path)
)

bronze_query.awaitTermination()


In [10]:
#Dim Products
sql_products = """
SELECT ProductID AS product_id, Name AS product_name
FROM product
"""

df_products_src = get_mysql_spark_df(spark, sql_products, **mysql_args)

df_products = (
    df_products_src
    .dropna()
    .dropDuplicates(["product_id"])
    .withColumn(
        "product_key",
        row_number().over(Window.orderBy(col("product_id"))).cast("int")
    )
    .select("product_key", "product_id", "product_name")
)

df_products.write.format("delta").mode("overwrite").save(dim_products_path)

In [11]:

# ---------- SILVER (FACT + DIM KEYS) ----------
df_bronze = spark.read.format("delta").load(fact_bronze_path)
df_dim_products = spark.read.format("delta").load(dim_products_path)

df_silver = (
    df_bronze
    .withColumn("order_date_key", date_format(col("order_date"), "yyyyMMdd").cast("int"))
    .join(df_dim_products.select("product_id", "product_key"), on="product_id", how="inner")
    .select(
        col("order_detail_id"),
        col("order_id"),
        col("product_key"),
        col("order_date_key"),
        col("quantity"),
        col("unit_price")
    )
    .withColumn("fact_sales_key", row_number().over(Window.orderBy(col("order_detail_id"))).cast("int"))
    .select("fact_sales_key", "order_detail_id", "order_id", "product_key", "order_date_key", "quantity", "unit_price")
)

df_silver.write.format("delta").mode("overwrite").save(fact_silver_path)

fact_silver_loc = fact_silver_path.replace("\\", "/")
spark.sql(f"CREATE TABLE IF NOT EXISTS fact_sales_silver USING DELTA LOCATION '{fact_silver_loc}'")

print(spark.read.format("delta").load(fact_silver_path).count())



2


In [12]:
#Dim Date
sql_dates = """
SELECT DISTINCT DATE(soh.OrderDate) AS full_date
FROM salesorderheader soh
"""
df_dates_src = get_mysql_spark_df(spark, sql_dates, **mysql_args)

df_dates = (
    df_dates_src
    .withColumn("full_date", to_date(col("full_date")))
    .withColumn("date_key", date_format(col("full_date"), "yyyyMMdd").cast("int"))
    .withColumn("year", year(col("full_date")))
    .withColumn("month", month(col("full_date")))
    .withColumn("day", dayofmonth(col("full_date")))
    .select("date_key", "full_date", "year", "month", "day")
)

df_dates.write.format("delta").mode("overwrite").save(dim_date_path)


In [13]:

# ---------- GOLD (AGGREGATES) ----------
df_dim_date = spark.read.format("delta").load(dim_date_path)
df_dim_products = spark.read.format("delta").load(dim_products_path)
df_silver = spark.read.format("delta").load(fact_silver_path)

df_gold = (
    df_silver.alias("f")
    .join(
        df_dim_date.select(col("date_key").alias("order_date_key"), "year", "month").alias("d"),
        on="order_date_key",
        how="left"
    )
    .join(
        df_dim_products.select("product_key", "product_name").alias("p"),
        on="product_key",
        how="left"
    )
    .groupBy("year", "month", "product_name")
    .agg(
        sum("quantity").alias("total_qty"),
        round(sum(col("quantity") * col("unit_price")), 2).alias("revenue_est")
    )
    .orderBy(col("revenue_est").desc())
)

df_gold.write.format("delta").mode("overwrite").save(fact_gold_path)

fact_gold_loc = fact_gold_path.replace("\\", "/")
spark.sql(f"CREATE TABLE IF NOT EXISTS fact_sales_gold USING DELTA LOCATION '{fact_gold_loc}'")

print(spark.read.format("delta").load(fact_gold_path).count())


2


In [14]:
# ---------- ORIGINAL VALIDATION QUERIES ----------
fact_bronze_loc = fact_bronze_path.replace("\\", "/")
spark.sql(f"CREATE TABLE IF NOT EXISTS fact_sales_bronze USING DELTA LOCATION '{fact_bronze_loc}'")

fact_silver_loc = fact_silver_path.replace("\\", "/")
spark.sql(f"CREATE TABLE IF NOT EXISTS fact_sales_silver USING DELTA LOCATION '{fact_silver_loc}'")

dim_products_loc = dim_products_path.replace("\\", "/")
spark.sql(f"CREATE TABLE IF NOT EXISTS dim_products USING DELTA LOCATION '{dim_products_loc}'")

dim_date_loc = dim_date_path.replace("\\", "/")
spark.sql(f"CREATE TABLE IF NOT EXISTS dim_date USING DELTA LOCATION '{dim_date_loc}'")

spark.sql("SELECT COUNT(*) AS bronze_rows FROM fact_sales_bronze").show()
spark.sql("SELECT COUNT(*) AS silver_rows FROM fact_sales_silver").show()
spark.sql("SELECT COUNT(*) AS gold_rows FROM fact_sales_gold").show()

spark.sql("""
SELECT
  d.year,
  d.month,
  p.product_name,
  SUM(f.quantity) AS total_qty,
  ROUND(SUM(f.quantity * f.unit_price), 2) AS revenue_est
FROM fact_sales_silver f
JOIN dim_date d     ON f.order_date_key = d.date_key
JOIN dim_products p ON f.product_key = p.product_key
GROUP BY d.year, d.month, p.product_name
ORDER BY revenue_est DESC
LIMIT 25
""").show(truncate=False)


+-----------+
|bronze_rows|
+-----------+
|          2|
+-----------+

+-----------+
|silver_rows|
+-----------+
|          2|
+-----------+

+---------+
|gold_rows|
+---------+
|        2|
+---------+

+----+-----+------------+---------+-----------+
|year|month|product_name|total_qty|revenue_est|
+----+-----+------------+---------+-----------+
+----+-----+------------+---------+-----------+

