# PySpark Data Warehouse in Databricks By Ahmed Abdulwahid

### 🔍 Overview

This project builds a scalable data warehouse using PySpark in Databricks, enabling efficient data processing and analytics. 📊⚡

### ❓ Why PySpark?

Traditional SQL warehouses struggle with big data. 🏗️ PySpark solves this with distributed computing, ⚡ faster queries, and seamless scalability. 📈

### 🎯 Objectives

✅ Design a structured data warehouse with fact & dimension tables.
✅ Use PySpark for ETL (Extract, Transform, Load) 🔄.
✅ Optimize with Delta Lake, partitioning, and caching 🛠️.


###🏆 Outcome

A fast, scalable warehouse ready for big data analytics! 📊🔥



# Incremental Loading

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DecimalType
from datetime import datetime
from decimal import Decimal  

# Initialize SparkSession
spark = SparkSession.builder.appName("SalesOrders").enableHiveSupport().getOrCreate()

# Define schema
schema = StructType([
    StructField("OrderID", IntegerType(), False),
    StructField("OrderDate", DateType(), False),
    StructField("CustomerID", IntegerType(), False),
    StructField("CustomerName", StringType(), False),
    StructField("CustomerEmail", StringType(), False),
    StructField("ProductID", IntegerType(), False),
    StructField("ProductName", StringType(), False),
    StructField("ProductCategory", StringType(), False),
    StructField("RegionID", IntegerType(), False),
    StructField("RegionName", StringType(), False),
    StructField("Country", StringType(), False),
    StructField("Quantity", IntegerType(), False),
    StructField("UnitPrice", DecimalType(10,2), False),  
    StructField("TotalAmount", DecimalType(10,2), False)  
])


data = [
    (1, datetime(2025, 3, 1), 201, "John Doe", "john.doe@example.com", 301, "Laptop", "Electronics", 401, "North America", "USA", 2, Decimal("900.00"), Decimal("1800.00")),
    (2, datetime(2025, 3, 2), 202, "Jane Smith", "jane.smith@example.com", 302, "Smartphone", "Electronics", 402, "Europe", "Germany", 1, Decimal("600.00"), Decimal("600.00")),
    (3, datetime(2025, 3, 3), 203, "Robert Brown", "robert.brown@example.com", 303, "Tablet", "Electronics", 403, "Asia", "India", 3, Decimal("350.00"), Decimal("1050.00")),
    (4, datetime(2025, 3, 4), 201, "John Doe", "john.doe@example.com", 304, "Headphones", "Accessories", 401, "North America", "USA", 1, Decimal("120.00"), Decimal("120.00")),
    (5, datetime(2025, 3, 5), 204, "Emily Davis", "emily.davis@example.com", 305, "Gaming Console", "Electronics", 402, "Europe", "France", 1, Decimal("500.00"), Decimal("500.00")),
    (6, datetime(2025, 3, 6), 202, "Jane Smith", "jane.smith@example.com", 306, "Smartwatch", "Electronics", 403, "Asia", "China", 2, Decimal("250.00"), Decimal("500.00")),
    (7, datetime(2025, 3, 7), 205, "Michael Wilson", "michael.wilson@example.com", 301, "Laptop", "Electronics", 401, "North America", "Canada", 1, Decimal("900.00"), Decimal("900.00")),
    (8, datetime(2025, 3, 8), 206, "Sarah Johnson", "sarah.johnson@example.com", 307, "Monitor", "Accessories", 402, "Europe", "UK", 2, Decimal("200.00"), Decimal("400.00"))
]

# Create DataFrame
df = spark.createDataFrame(data, schema=schema)

# Show DataFrame
display(df)

# Write DataFrame to Spark SQL Table
df.write.mode("overwrite").saveAsTable("Sales.Orders")

OrderID,OrderDate,CustomerID,CustomerName,CustomerEmail,ProductID,ProductName,ProductCategory,RegionID,RegionName,Country,Quantity,UnitPrice,TotalAmount
1,2025-03-01,201,John Doe,john.doe@example.com,301,Laptop,Electronics,401,North America,USA,2,900.0,1800.0
2,2025-03-02,202,Jane Smith,jane.smith@example.com,302,Smartphone,Electronics,402,Europe,Germany,1,600.0,600.0
3,2025-03-03,203,Robert Brown,robert.brown@example.com,303,Tablet,Electronics,403,Asia,India,3,350.0,1050.0
4,2025-03-04,201,John Doe,john.doe@example.com,304,Headphones,Accessories,401,North America,USA,1,120.0,120.0
5,2025-03-05,204,Emily Davis,emily.davis@example.com,305,Gaming Console,Electronics,402,Europe,France,1,500.0,500.0
6,2025-03-06,202,Jane Smith,jane.smith@example.com,306,Smartwatch,Electronics,403,Asia,China,2,250.0,500.0
7,2025-03-07,205,Michael Wilson,michael.wilson@example.com,301,Laptop,Electronics,401,North America,Canada,1,900.0,900.0
8,2025-03-08,206,Sarah Johnson,sarah.johnson@example.com,307,Monitor,Accessories,402,Europe,UK,2,200.0,400.0


In [0]:
from datetime import datetime
from decimal import Decimal

# New data to insert
new_data = [
    (9, datetime(2025, 3, 9), 207, "Daniel Martinez", "daniel.martinez@example.com", 308, "Keyboard", "Accessories", 403, "Asia", "Japan", 2, Decimal("50.00"), Decimal("100.00")),
    (10, datetime(2025, 3, 10), 208, "Olivia White", "olivia.white@example.com", 309, "Mouse", "Accessories", 401, "North America", "USA", 1, Decimal("40.00"), Decimal("40.00")),
    (11, datetime(2025, 3, 11), 209, "Liam Harris", "liam.harris@example.com", 310, "Printer", "Electronics", 402, "Europe", "Germany", 1, Decimal("150.00"), Decimal("150.00")),
    (12, datetime(2025, 3, 12), 210, "Sophia Clark", "sophia.clark@example.com", 311, "External Hard Drive", "Accessories", 403, "Asia", "India", 2, Decimal("120.00"), Decimal("240.00")),
    (13, datetime(2025, 3, 13), 211, "Benjamin Lewis", "benjamin.lewis@example.com", 312, "Graphics Card", "Electronics", 401, "North America", "Canada", 1, Decimal("700.00"), Decimal("700.00")),
    (14, datetime(2025, 3, 14), 212, "Emma Walker", "emma.walker@example.com", 313, "Webcam", "Accessories", 402, "Europe", "France", 1, Decimal("80.00"), Decimal("80.00")),
    (15, datetime(2025, 3, 15), 213, "Lucas Hall", "lucas.hall@example.com", 314, "Router", "Electronics", 403, "Asia", "China", 2, Decimal("90.00"), Decimal("180.00")),
    (16, datetime(2025, 3, 16), 214, "Mia Allen", "mia.allen@example.com", 315, "USB Flash Drive", "Accessories", 401, "North America", "USA", 5, Decimal("20.00"), Decimal("100.00")),
    (17, datetime(2025, 3, 17), 215, "Noah King", "noah.king@example.com", 316, "Projector", "Electronics", 402, "Europe", "UK", 1, Decimal("400.00"), Decimal("400.00")),
    (18, datetime(2025, 3, 18), 216, "Ava Scott", "ava.scott@example.com", 317, "Speakers", "Accessories", 403, "Asia", "Japan", 2, Decimal("150.00"), Decimal("300.00"))
]

# Create DataFrame for new data
new_df = spark.createDataFrame(new_data, schema=schema)

# Append new data to existing table
new_df.write.mode("append").saveAsTable("Sales.Orders")

# Display updated table
display(spark.sql("SELECT * FROM Sales.Orders ORDER BY OrderID;"))

OrderID,OrderDate,CustomerID,CustomerName,CustomerEmail,ProductID,ProductName,ProductCategory,RegionID,RegionName,Country,Quantity,UnitPrice,TotalAmount
1,2025-03-01,201,John Doe,john.doe@example.com,301,Laptop,Electronics,401,North America,USA,2,900.0,1800.0
2,2025-03-02,202,Jane Smith,jane.smith@example.com,302,Smartphone,Electronics,402,Europe,Germany,1,600.0,600.0
3,2025-03-03,203,Robert Brown,robert.brown@example.com,303,Tablet,Electronics,403,Asia,India,3,350.0,1050.0
4,2025-03-04,201,John Doe,john.doe@example.com,304,Headphones,Accessories,401,North America,USA,1,120.0,120.0
5,2025-03-05,204,Emily Davis,emily.davis@example.com,305,Gaming Console,Electronics,402,Europe,France,1,500.0,500.0
6,2025-03-06,202,Jane Smith,jane.smith@example.com,306,Smartwatch,Electronics,403,Asia,China,2,250.0,500.0
7,2025-03-07,205,Michael Wilson,michael.wilson@example.com,301,Laptop,Electronics,401,North America,Canada,1,900.0,900.0
8,2025-03-08,206,Sarah Johnson,sarah.johnson@example.com,307,Monitor,Accessories,402,Europe,UK,2,200.0,400.0
9,2025-03-09,207,Daniel Martinez,daniel.martinez@example.com,308,Keyboard,Accessories,403,Asia,Japan,2,50.0,100.0
9,2025-03-09,207,Daniel Martinez,daniel.martinez@example.com,308,Keyboard,Accessories,403,Asia,Japan,2,50.0,100.0


# Data Warehousing

In [0]:
# Drop the database if it exists (including all tables inside it)
spark.sql("DROP DATABASE IF EXISTS SalesDWH CASCADE")

# Create a new database for the Data Warehouse
spark.sql("CREATE DATABASE SalesDWH")

# Switch to the new database (Not required in Spark, but useful for SQL queries)
spark.sql("USE SalesDWH")

# Display available databases to confirm the change
display(spark.sql("SHOW DATABASES"))

databaseName
default
sales
salesdwh


## Staging Layer

In [0]:
# Create or replace the staging table in SalesDWH
spark.sql("""
    CREATE OR REPLACE TABLE SalesDWH.stg_sales AS
    SELECT * FROM Sales.Orders
    WHERE OrderDate > '2025-03-08'
""")

# Display the staging table to confirm the data is loaded
display(spark.sql("SELECT * FROM SalesDWH.stg_sales"))

OrderID,OrderDate,CustomerID,CustomerName,CustomerEmail,ProductID,ProductName,ProductCategory,RegionID,RegionName,Country,Quantity,UnitPrice,TotalAmount
11,2025-03-11,209,Liam Harris,liam.harris@example.com,310,Printer,Electronics,402,Europe,Germany,1,150.0,150.0
12,2025-03-12,210,Sophia Clark,sophia.clark@example.com,311,External Hard Drive,Accessories,403,Asia,India,2,120.0,240.0
13,2025-03-13,211,Benjamin Lewis,benjamin.lewis@example.com,312,Graphics Card,Electronics,401,North America,Canada,1,700.0,700.0
11,2025-03-11,209,Liam Harris,liam.harris@example.com,310,Printer,Electronics,402,Europe,Germany,1,150.0,150.0
12,2025-03-12,210,Sophia Clark,sophia.clark@example.com,311,External Hard Drive,Accessories,403,Asia,India,2,120.0,240.0
13,2025-03-13,211,Benjamin Lewis,benjamin.lewis@example.com,312,Graphics Card,Electronics,401,North America,Canada,1,700.0,700.0
16,2025-03-16,214,Mia Allen,mia.allen@example.com,315,USB Flash Drive,Accessories,401,North America,USA,5,20.0,100.0
17,2025-03-17,215,Noah King,noah.king@example.com,316,Projector,Electronics,402,Europe,UK,1,400.0,400.0
18,2025-03-18,216,Ava Scott,ava.scott@example.com,317,Speakers,Accessories,403,Asia,Japan,2,150.0,300.0
16,2025-03-16,214,Mia Allen,mia.allen@example.com,315,USB Flash Drive,Accessories,401,North America,USA,5,20.0,100.0


## Transformation Layer


In [0]:
# Create a view for transformed sales data
spark.sql("""
    CREATE OR REPLACE VIEW SalesDWH.trans_sales AS
    SELECT * FROM SalesDWH.stg_sales
    WHERE Quantity IS NOT NULL
""")

# Display the view to verify the data
display(spark.sql("SELECT * FROM SalesDWH.trans_sales"))

OrderID,OrderDate,CustomerID,CustomerName,CustomerEmail,ProductID,ProductName,ProductCategory,RegionID,RegionName,Country,Quantity,UnitPrice,TotalAmount
11,2025-03-11,209,Liam Harris,liam.harris@example.com,310,Printer,Electronics,402,Europe,Germany,1,150.0,150.0
12,2025-03-12,210,Sophia Clark,sophia.clark@example.com,311,External Hard Drive,Accessories,403,Asia,India,2,120.0,240.0
13,2025-03-13,211,Benjamin Lewis,benjamin.lewis@example.com,312,Graphics Card,Electronics,401,North America,Canada,1,700.0,700.0
11,2025-03-11,209,Liam Harris,liam.harris@example.com,310,Printer,Electronics,402,Europe,Germany,1,150.0,150.0
12,2025-03-12,210,Sophia Clark,sophia.clark@example.com,311,External Hard Drive,Accessories,403,Asia,India,2,120.0,240.0
13,2025-03-13,211,Benjamin Lewis,benjamin.lewis@example.com,312,Graphics Card,Electronics,401,North America,Canada,1,700.0,700.0
16,2025-03-16,214,Mia Allen,mia.allen@example.com,315,USB Flash Drive,Accessories,401,North America,USA,5,20.0,100.0
17,2025-03-17,215,Noah King,noah.king@example.com,316,Projector,Electronics,402,Europe,UK,1,400.0,400.0
18,2025-03-18,216,Ava Scott,ava.scott@example.com,317,Speakers,Accessories,403,Asia,Japan,2,150.0,300.0
16,2025-03-16,214,Mia Allen,mia.allen@example.com,315,USB Flash Drive,Accessories,401,North America,USA,5,20.0,100.0


## Core Layer


# Dimensional Modeling

### Dim customers

In [0]:
# Create the DimCustomers dimension table
spark.sql("""
    CREATE OR REPLACE TABLE SalesDWH.DimCustomers (
        DimCustomersKey INT,
        CustomerID INT,
        CustomerName STRING,
        CustomerEmail STRING
    )
""")

# Verify if the table is created successfully
display(spark.sql("DESCRIBE TABLE SalesDWH.DimCustomers"))

col_name,data_type,comment
DimCustomersKey,int,
CustomerID,int,
CustomerName,string,
CustomerEmail,string,


In [0]:
# Create the view for DimCustomers
spark.sql("""
    CREATE OR REPLACE VIEW SalesDWH.view_DimCustomers AS 
    SELECT 
        row_number() OVER (ORDER BY T.CustomerID) AS DimCustomersKey, 
        T.* 
    FROM (
        SELECT DISTINCT CustomerID, CustomerName, CustomerEmail
        FROM SalesDWH.trans_sales
    ) AS T
""")

# Verify if the view is created successfully
display(spark.sql("SELECT * FROM SalesDWH.view_DimCustomers"))

DimCustomersKey,CustomerID,CustomerName,CustomerEmail
1,207,Daniel Martinez,daniel.martinez@example.com
2,208,Olivia White,olivia.white@example.com
3,209,Liam Harris,liam.harris@example.com
4,210,Sophia Clark,sophia.clark@example.com
5,211,Benjamin Lewis,benjamin.lewis@example.com
6,212,Emma Walker,emma.walker@example.com
7,213,Lucas Hall,lucas.hall@example.com
8,214,Mia Allen,mia.allen@example.com
9,215,Noah King,noah.king@example.com
10,216,Ava Scott,ava.scott@example.com


In [0]:
# Insert data from the view into the DimCustomers table
spark.sql("""
    INSERT INTO SalesDWH.DimCustomers
    SELECT * FROM SalesDWH.view_DimCustomers
""")

# Verify insertion
display(spark.sql("SELECT * FROM SalesDWH.DimCustomers"))

DimCustomersKey,CustomerID,CustomerName,CustomerEmail
1,207,Daniel Martinez,daniel.martinez@example.com
2,208,Olivia White,olivia.white@example.com
3,209,Liam Harris,liam.harris@example.com
4,210,Sophia Clark,sophia.clark@example.com
5,211,Benjamin Lewis,benjamin.lewis@example.com
6,212,Emma Walker,emma.walker@example.com
7,213,Lucas Hall,lucas.hall@example.com
8,214,Mia Allen,mia.allen@example.com
9,215,Noah King,noah.king@example.com
10,216,Ava Scott,ava.scott@example.com


### Dim Products

In [0]:
# Create or replace the DimProducts table
spark.sql("""
    CREATE OR REPLACE TABLE SalesDWH.DimProducts (
        DimProductKey INT,
        ProductID INT,
        ProductName STRING,
        ProductCategory STRING
    )
""")

# Verify table creation
display(spark.sql("SHOW TABLES IN SalesDWH"))

database,tableName,isTemporary
salesdwh,dimcustomers,False
salesdwh,dimproducts,False
salesdwh,stg_sales,False
salesdwh,trans_sales,False
salesdwh,view_dimcustomers,False


In [0]:
# Create or replace the view for DimProduct
spark.sql("""
    CREATE OR REPLACE VIEW SalesDWH.view_DimProduct AS
    SELECT row_number() OVER (ORDER BY T.ProductID) AS DimProductKey, T.*
    FROM (
        SELECT DISTINCT(ProductID) AS ProductID, 
               ProductName, 
               ProductCategory
        FROM SalesDWH.trans_sales
    ) AS T
""")

# Verify view creation
display(spark.sql("SHOW VIEWS IN SalesDWH"))

namespace,viewName,isTemporary,isMaterialized
salesdwh,trans_sales,False,False
salesdwh,view_dimcustomers,False,False
salesdwh,view_dimproduct,False,False


In [0]:
# Insert data into DimProducts table
spark.sql("""
    INSERT INTO SalesDWH.DimProducts
    SELECT * FROM SalesDWH.view_DimProduct
""")

# Verify data insertion
display(spark.sql("SELECT * FROM SalesDWH.DimProducts"))

DimProductKey,ProductID,ProductName,ProductCategory
1,308,Keyboard,Accessories
2,309,Mouse,Accessories
3,310,Printer,Electronics
4,311,External Hard Drive,Accessories
5,312,Graphics Card,Electronics
6,313,Webcam,Accessories
7,314,Router,Electronics
8,315,USB Flash Drive,Accessories
9,316,Projector,Electronics
10,317,Speakers,Accessories


### Dim Region


In [0]:
# Create or replace the DimRegion table
spark.sql("""
    CREATE OR REPLACE TABLE SalesDWH.DimRegion (
        DimRegionKey INT,
        RegionID INT,
        RegionName STRING,
        Country STRING
    )
""")

# Verify table creation
display(spark.sql("SHOW TABLES IN SalesDWH"))

database,tableName,isTemporary
salesdwh,dimcustomers,False
salesdwh,dimproducts,False
salesdwh,dimregion,False
salesdwh,stg_sales,False
salesdwh,trans_sales,False
salesdwh,view_dimcustomers,False
salesdwh,view_dimproduct,False


In [0]:
# Create or replace the view for DimRegion
spark.sql("""
    CREATE OR REPLACE VIEW SalesDWH.view_DimRegion AS
    SELECT row_number() OVER (ORDER BY T.RegionID) AS DimRegionKey, T.*
    FROM (
        SELECT DISTINCT(RegionID) AS RegionID, 
               RegionName, 
               Country
        FROM SalesDWH.trans_sales
    ) AS T
""")

# Verify view creation
display(spark.sql("SHOW VIEWS IN SalesDWH"))

namespace,viewName,isTemporary,isMaterialized
salesdwh,trans_sales,False,False
salesdwh,view_dimcustomers,False,False
salesdwh,view_dimproduct,False,False
salesdwh,view_dimregion,False,False


In [0]:
# Insert data into DimRegion from view_DimRegion
spark.sql("""
    INSERT INTO SalesDWH.DimRegion
    SELECT * FROM SalesDWH.view_DimRegion
""")

# Verify data insertion
display(spark.sql("SELECT * FROM SalesDWH.DimRegion"))

DimRegionKey,RegionID,RegionName,Country
1,401,North America,Canada
2,401,North America,USA
3,402,Europe,Germany
4,402,Europe,UK
5,402,Europe,France
6,403,Asia,India
7,403,Asia,Japan
8,403,Asia,China


### Dim Date


In [0]:
# Create or replace the DimDate table
spark.sql("""
    CREATE OR REPLACE TABLE SalesDWH.DimDate (
        DimDateKey INT,
        OrderDate DATE
    )
""")

# Verify table creation
display(spark.sql("SHOW TABLES IN SalesDWH"))


database,tableName,isTemporary
salesdwh,dimcustomers,False
salesdwh,dimdate,False
salesdwh,dimproducts,False
salesdwh,dimregion,False
salesdwh,stg_sales,False
salesdwh,trans_sales,False
salesdwh,view_dimcustomers,False
salesdwh,view_dimproduct,False
salesdwh,view_dimregion,False


In [0]:
# Create or replace the view for DimDate
spark.sql("""
    CREATE OR REPLACE VIEW SalesDWH.view_DimDate AS
    SELECT row_number() OVER (ORDER BY T.OrderDate) AS DimDateKey, T.*
    FROM (
        SELECT DISTINCT(OrderDate) AS OrderDate
        FROM SalesDWH.trans_sales
    ) AS T
""")

# Verify view creation
display(spark.sql("SHOW VIEWS IN SalesDWH"))

namespace,viewName,isTemporary,isMaterialized
salesdwh,trans_sales,False,False
salesdwh,view_dimcustomers,False,False
salesdwh,view_dimproduct,False,False
salesdwh,view_dimregion,False,False
salesdwh,view_dimdate,False,False


In [0]:
# Insert data into DimDate table from view_DimDate
spark.sql("""
    INSERT INTO SalesDWH.DimDate
    SELECT * FROM SalesDWH.view_DimDate
""")

# Verify data insertion
display(spark.sql("SELECT DISTINCT(*) FROM SalesDWH.DimDate ORDER BY OrderDate"))

DimDateKey,OrderDate
1,2025-03-09
2,2025-03-10
3,2025-03-11
4,2025-03-12
5,2025-03-13
6,2025-03-14
7,2025-03-15
8,2025-03-16
9,2025-03-17
10,2025-03-18


### FACT Table


In [0]:
# Create FactSales table
spark.sql("""
    CREATE TABLE SalesDWH.FactSales (
        OrderID INT,
        Quantity DECIMAL(10,2),
        UnitPrice DECIMAL(10,2),
        TotalAmount DECIMAL(10,2),
        DimProductKey INT,
        DimDateKey INT,
        DimRegionKey INT,
        DimCustomerKey INT
    )
""")

# Verify table creation
display(spark.sql("SHOW TABLES IN SalesDWH"))

database,tableName,isTemporary
salesdwh,dimcustomers,False
salesdwh,dimdate,False
salesdwh,dimproducts,False
salesdwh,dimregion,False
salesdwh,factsales,False
salesdwh,stg_sales,False
salesdwh,trans_sales,False
salesdwh,view_dimcustomers,False
salesdwh,view_dimdate,False
salesdwh,view_dimproduct,False


In [0]:
# Join FactSales with dimension tables
fact_sales_query = """
    SELECT 
        F.OrderID,
        F.Quantity,
        F.UnitPrice,
        F.TotalAmount,
        DC.DimCustomersKey,
        DP.DimProductKey,
        DD.DimDateKey,
        DR.DimRegionKey
    FROM SalesDWH.trans_sales F
    LEFT JOIN SalesDWH.DimCustomers DC ON F.CustomerID = DC.CustomerID
    LEFT JOIN SalesDWH.DimProducts DP ON F.ProductID = DP.ProductID
    LEFT JOIN SalesDWH.DimDate DD ON F.OrderDate = DD.OrderDate
    LEFT JOIN SalesDWH.DimRegion DR ON F.Country = DR.Country
"""

# Execute query and display result
display(spark.sql(fact_sales_query))

OrderID,Quantity,UnitPrice,TotalAmount,DimCustomersKey,DimProductKey,DimDateKey,DimRegionKey
11,1,150.0,150.0,3,3,3,3
11,1,150.0,150.0,3,3,3,3
11,1,150.0,150.0,3,3,3,3
11,1,150.0,150.0,3,3,3,3
12,2,120.0,240.0,4,4,4,6
12,2,120.0,240.0,4,4,4,6
12,2,120.0,240.0,4,4,4,6
12,2,120.0,240.0,4,4,4,6
13,1,700.0,700.0,5,5,5,1
13,1,700.0,700.0,5,5,5,1
