Retail Sales Pipeline ----- Capstone

This project simulates a retail sales pipeline using streaming JSON data and external dimensions (MongoDB, Azure SQL). The pipeline is structured using a Lakehouse architecture with Bronze, Silver, and Gold layers. Final outputs are saved to a Delta table and accessible via SQL for analysis.

Data Sources
Streaming JSON files (sales_fact) using Databricks AutoLoader

MongoDB Atlas: customer_dim (NoSQL)

Azure SQL Database:

product_dim

dim_date

Bronze Layer
Ingested raw sales data from /FileStore/streaming/sales_fact/ using cloudFiles and readStream

Created a streaming temp view: bronze_sales

Silver Layer
Cleaned and transformed data using:

to_date() on date strings

regexp_replace() to fix Mongo-style customer_id formatting

Joined bronze_sales with:

product_dim (via product_id)

customer_dim (via cleaned customer_id)

dim_date (via date = FullDate)

Final joined table: silver_sales

Gold Layer
Aggregated silver_sales to compute:

Total quantity sold per product

Total revenue per product

Average unit price

Saved final result as a Delta table: gold_product_summary

Access and Output
Queried the Gold table using %sql

Verified correctness through record counts and example outputs

Key Notes
Data schema and primary/foreign key relationships were fully aligned

MongoDB IDs were cleaned using Spark functions

Pipeline meets Lakehouse architecture principles with reliable integration, transformation, and aggregation

%  IMPORTANT  %
%  IMPORTANT  %
%  IMPORTANT  %
%  IMPORTANT  %

If Bronze Stream is not running when file is opened it can be restarted by re-running the AutoLoader block at Cell 9

%  IMPORTANT  %
%  IMPORTANT  %
%  IMPORTANT  %
%  IMPORTANT  %

In [0]:

df_customers = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", mongo_uri) \
    .option("collection", "customers") \
    .load()

df_customers.show(5)


+--------------------+--------------------+------+--------------------+-------------------+--------------------+---------------+--------------------+----------------+
|                 _id|            accounts|active|             address|          birthdate|               email|           name|    tier_and_details|        username|
+--------------------+--------------------+------+--------------------+-------------------+--------------------+---------------+--------------------+----------------+
|{5ca4bbcea2dd94ee...|[371138, 324287, ...|  true|9286 Bethany Glen...|1977-03-02 02:20:31|arroyocolton@gmai...|  Elizabeth Ray|{0df078f33aa74a2e...|         fmiller|
|{5ca4bbcea2dd94ee...|            [116508]|  NULL|Unit 1047 Box 408...|1994-02-19 23:46:27|cooperalexis@hotm...|  Lindsay Cowan|{5d6a79083c26402b...|valenciajennifer|
|{5ca4bbcea2dd94ee...|[462501, 228290, ...|  NULL|55711 Janet Plaza...|1988-06-20 22:15:34|timothy78@hotmail...|Katherine David|                  {}|      hillrachel

In [0]:
customer_dim = df_customers.select(
    col("_id").cast("string").alias("customer_id"),
    col("name").alias("full_name"),
    col("email"),
    col("username"),
    when(size(split(col("address"), ",")) > 1, split(col("address"), ",")[1]).alias("city")
)

customer_dim.show(5)


+--------------------+---------------+--------------------+----------------+---------+
|         customer_id|      full_name|               email|        username|     city|
+--------------------+---------------+--------------------+----------------+---------+
|{5ca4bbcea2dd94ee...|  Elizabeth Ray|arroyocolton@gmai...|         fmiller| CO 22939|
|{5ca4bbcea2dd94ee...|  Lindsay Cowan|cooperalexis@hotm...|valenciajennifer|     NULL|
|{5ca4bbcea2dd94ee...|Katherine David|timothy78@hotmail...|      hillrachel| CT 62716|
|{5ca4bbcea2dd94ee...|Leslie Martinez| tcrawford@gmail.com|    serranobrian|     NULL|
|{5ca4bbcea2dd94ee...|  Brad Cardenas|  dustin37@yahoo.com|   charleshudson| CT 53165|
+--------------------+---------------+--------------------+----------------+---------+
only showing top 5 rows


In [0]:
customer_dim.write \
    .format("jdbc") \
    .mode("overwrite") \
    .option("url", "jdbc:sqlserver://capstone-sql-server.database.windows.net:1433;databaseName=retail_capstone_project") \
    .option("dbtable", "customer_dim") \
    .option("user", "CloudSA1b6248af") \
    .option("password", "Passw0rd123") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()


In [0]:
product_dim = df_products.select(
    col("product_id"),
    col("product_name"),
    col("category"),
    col("brand"),
    col("unit_price").cast("float")
)

product_dim.show(5)


+----------+------------+---------------+------------+----------+
|product_id|product_name|       category|       brand|unit_price|
+----------+------------+---------------+------------+----------+
|      P001|   Product 1|    Electronics|AmazonBasics|     19.52|
|      P002|   Product 2|Office Supplies|    Logitech|     98.14|
|      P003|   Product 3| Home & Kitchen|     Philips|     80.14|
|      P004|   Product 4| Home & Kitchen|     Samsung|     46.49|
|      P005|   Product 5|    Electronics|AmazonBasics|     60.25|
+----------+------------+---------------+------------+----------+
only showing top 5 rows


In [0]:
product_dim.write \
    .format("jdbc") \
    .mode("overwrite") \
    .option("url", "jdbc:sqlserver://capstone-sql-server.database.windows.net:1433;databaseName=retail_capstone_project") \
    .option("dbtable", "product_dim") \
    .option("user", "CloudSA1b6248af") \
    .option("password", "Passw0rd123") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()


In [0]:
from pyspark.sql.types import StructType, StringType, IntegerType, FloatType

# Define the schema
schema = StructType() \
    .add("sale_id", StringType()) \
    .add("product_id", StringType()) \
    .add("customer_id", StringType()) \
    .add("date", StringType()) \
    .add("quantity", IntegerType()) \
    .add("unit_price", FloatType()) \
    .add("total_amount", FloatType())

# Path to uploaded JSON files
input_path = "/FileStore/streaming/sales_fact/"

# Bronze stream using AutoLoader
bronze_df = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .schema(schema) \
    .load(input_path)


In [0]:
# Bronze Layer: Strea Setup
# This starts the stream and registers 'bronze_sales' for SQL access
# Write streaming data to an in-memory temp view
query = bronze_df.writeStream \
    .format("memory") \
    .queryName("bronze_sales") \
    .outputMode("append") \
    .start()


Load Dimension Tables from Azure SQL

In [0]:
jdbc_options = {
    "url": "jdbc:sqlserver://capstone-sql-server.database.windows.net:1433;databaseName=retail_capstone_project",
    "user": "CloudSA1b6248af",
    "password": "Passw0rd123",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

In [0]:
product_dim_sql = spark.read \
    .format("jdbc") \
    .options(**jdbc_options) \
    .option("dbtable", "product_dim") \
    .load()

product_dim_sql.show(5)


+----------+------------+---------------+------------+----------+
|product_id|product_name|       category|       brand|unit_price|
+----------+------------+---------------+------------+----------+
|      P001|   Product 1|    Electronics|AmazonBasics|     19.52|
|      P002|   Product 2|Office Supplies|    Logitech|     98.14|
|      P001|   Product 1|    Electronics|AmazonBasics|     19.52|
|      P002|   Product 2|Office Supplies|    Logitech|     98.14|
|      P003|   Product 3| Home & Kitchen|     Philips|     80.14|
+----------+------------+---------------+------------+----------+
only showing top 5 rows


In [0]:
customer_dim_sql = spark.read \
    .format("jdbc") \
    .options(**jdbc_options) \
    .option("dbtable", "customer_dim") \
    .load()

customer_dim_sql.show(5)


+--------------------+---------------+--------------------+----------------+---------+
|         customer_id|      full_name|               email|        username|     city|
+--------------------+---------------+--------------------+----------------+---------+
|{5ca4bbcea2dd94ee...|  Elizabeth Ray|arroyocolton@gmai...|         fmiller| CO 22939|
|{5ca4bbcea2dd94ee...|  Lindsay Cowan|cooperalexis@hotm...|valenciajennifer|     NULL|
|{5ca4bbcea2dd94ee...|Katherine David|timothy78@hotmail...|      hillrachel| CT 62716|
|{5ca4bbcea2dd94ee...|Leslie Martinez| tcrawford@gmail.com|    serranobrian|     NULL|
|{5ca4bbcea2dd94ee...|  Brad Cardenas|  dustin37@yahoo.com|   charleshudson| CT 53165|
+--------------------+---------------+--------------------+----------------+---------+
only showing top 5 rows


In [0]:
dim_date_sql = spark.read \
    .format("jdbc") \
    .options(**jdbc_options) \
    .option("dbtable", "DimDate") \
    .load()

dim_date_sql.show(5)


+--------+----------+---+-----+---------+-------+----+-----------+
| DateKey|  FullDate|Day|Month|MonthName|Quarter|Year|WeekdayName|
+--------+----------+---+-----+---------+-------+----+-----------+
|20230101|2023-01-01|  1|    1|  January|      1|2023|     Sunday|
|20230102|2023-01-02|  2|    1|  January|      1|2023|     Monday|
|20230103|2023-01-03|  3|    1|  January|      1|2023|    Tuesday|
|20230104|2023-01-04|  4|    1|  January|      1|2023|  Wednesday|
|20230105|2023-01-05|  5|    1|  January|      1|2023|   Thursday|
+--------+----------+---+-----+---------+-------+----+-----------+
only showing top 5 rows


Prepare Date and Customer Keys

In [0]:
from pyspark.sql.functions import to_date

# Add typed date for joining
bronze_typed = spark.sql("SELECT * FROM bronze_sales") \
    .withColumn("date", to_date("date", "yyyy-MM-dd"))


In [0]:
from pyspark.sql.functions import regexp_replace

# Clean customer_id field by removing curly braces
customer_dim_clean = customer_dim_sql.withColumn(
    "customer_id", regexp_replace("customer_id", "[{}]", "")
)


In [0]:
dim_date_prepped = dim_date_sql.withColumn("FullDate", to_date("FullDate"))


Silver Join

In [0]:
silver_sales = bronze_typed \
    .join(product_dim_sql, "product_id") \
    .join(customer_dim_clean, "customer_id") \
    .join(dim_date_prepped, bronze_typed["date"] == dim_date_prepped["FullDate"]) \
    .select(
        "sale_id",
        "product_id",
        "customer_id",
        "FullDate",
        bronze_typed["quantity"].alias("quantity"),
        bronze_typed["unit_price"].alias("unit_price"),
        bronze_typed["total_amount"].alias("total_amount")
    )



Gold Summary

In [0]:
from pyspark.sql.functions import sum, avg

gold_product_summary = silver_sales.groupBy("product_id").agg(
    sum("quantity").alias("total_quantity_sold"),
    sum("total_amount").alias("total_revenue"),
    avg(silver_sales["unit_price"]).alias("avg_unit_price")
)

gold_product_summary.show(10)




+----------+-------------------+------------------+------------------+
|product_id|total_quantity_sold|     total_revenue|    avg_unit_price|
+----------+-------------------+------------------+------------------+
|      P012|                 10| 670.4000244140625| 67.04000091552734|
|      P016|                 24|             839.5| 37.17249941825867|
|      P007|                 14| 179.5999984741211|14.755000114440918|
|      P003|                 14| 878.6800231933594| 62.21500205993652|
|      P024|                 10| 780.5399932861328| 72.89000193277995|
|      P025|                  8| 591.8400268554688|  73.9800033569336|
|      P010|                 16|1256.0000305175781| 70.92666625976562|
|      P006|                 24|1642.3399963378906| 63.15749931335449|
|      P011|                 16| 686.4799957275391| 49.29999987284342|
|      P020|                 10| 608.2599945068359| 68.68999989827473|
+----------+-------------------+------------------+------------------+
only s

Save Gold Layer as Delta Table

In [0]:
# Overwrite any previous gold table
gold_product_summary.write.format("delta").mode("overwrite").saveAsTable("gold_product_summary")


In [0]:
%sql
SELECT * FROM gold_product_summary LIMIT 10

product_id,total_quantity_sold,total_revenue,avg_unit_price
P012,10,670.4000244140625,67.04000091552734
P016,24,839.5,37.17249941825867
P007,14,179.5999984741211,14.755000114440918
P003,14,878.6800231933594,62.21500205993652
P024,10,780.5399932861328,72.89000193277995
P025,8,591.8400268554688,73.9800033569336
P010,16,1256.000030517578,70.92666625976562
P006,24,1642.3399963378906,63.15749931335449
P011,16,686.4799957275391,49.29999987284342
P020,10,608.2599945068359,68.68999989827473
